Task Queue 任務隊列 – 生產者與消費者

在開發看盤軟體時,時常會需要用到一種 Task Queue,可以將收到的任務存放起來,供稍後取出來處理,例如收到股價資料要依序處理。

又或者在觀察者模式中訂閱了某檔個股,而個股會有許多事件需要通知觀察者,例如歷史股價準備完成、即時股價到達或資料有誤需重抓等,觀察者可能來不及處理,需要先將事件存入 Task Queue 以便稍後處理。

另外不難想像 Task Queue 的兩端就像是生產者與消費者的關係,且可能同時存在多個生產者及多個消費者,因此 Task Queue 還必須是 Thread-safe。

Task Queue
任務隊列

以下為精簡後的 code,並且假設 Task Queue 存放的型別為 std::string,讓我們來看程式的關鍵部分,也就是 Put function 和 Take function 的寫法:

#include 
#include 
#include 
#include 

class CTaskQueue
{
public:
	void Put(const std::string& s)
	{
		{
			std::lock_guard lock(m_mutex);
			m_queue.push(s);
		}

		m_condNotEmpty.notify_one();
	}

	std::string Take()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty();
			});

		std::string s = m_queue.front();
		m_queue.pop();
		return s;
	}

private:
	std::mutex m_mutex;
	std::queue m_queue;
	std::condition_variable m_condNotEmpty;
};

首先來看 Take function,在 m_queue 為空的時候,消費者 thread 不能取得資料,因此有必要使用 std::condition_variable 根據 m_queue 是否為空的條件來暫停消費者 thread 執行,直到 m_queue 不為空時才被喚醒。注意這裡使用 std::condition_variable 的 wait 成員函式,當表達式為 true 時會繼續執行,因此要使用 !this->m_queue.empty() 來當作判斷式。

還有要注意的一點是,當消費者 thread 被暫停後,m_mutex 會被釋放,不然程式就會永遠卡住了,請各位暫停並思考一下此處奧妙,或繼續往下看也會有解答。

那麼由誰來喚醒消費者 thread 呢?接下來就將目光移到 Put function。因為需要 Thread-safe,所以 m_queue.push(s) 需要使用 std::mutex 來保護,這沒有問題,並且請注意,這裡的生產者 thread 是可以搶到 m_mutex 的,這也是為什麼 std::condition_variable 條件不滿足需要釋放 m_mutex,不釋放則程式會永遠卡住。

當生產者 thread 將資料放進 Task Queue 後,m_condNotEmpty.notify_one() 會喚醒一個暫停中的消費者 thread,注意此處 m_condNotEmpty.notify_one() 的呼叫是在 { … } 之外,不在外面的話,因為 m_mutex 尚未被 Put function 釋放,有可能 Take function 內被喚醒的消費者 thread 會搶不到 m_mutex,又會短暫進入暫停,程式執行效率會稍慢一些。

好了,這就是全部了,謝謝各位。以下附上完整的程式碼,完整的程式碼複雜許多,未來再細細解釋吧!

#pragma once

#include 
#include 
#include 

template
class CTaskQueue final
{
public:
	CTaskQueue() : m_bStop(true), m_mutex(), m_queue(), m_condNotEmpty() {}
	CTaskQueue(const CTaskQueue&) = delete;
	CTaskQueue(CTaskQueue&&) = delete;
	CTaskQueue& operator=(const CTaskQueue&) = delete;
	CTaskQueue& operator=(CTaskQueue&&) = delete;
	~CTaskQueue()
	{
		Stop();
	}

public:
	void Start()
	{
		std::lock_guard lock(m_mutex);
		m_bStop = false;
	}

	void Stop()
	{
		{
			std::lock_guard lock(m_mutex);
			m_bStop = true;
		}

		m_condNotEmpty.notify_all();
	}

	void Clear()
	{
		std::lock_guard lock(m_mutex);
		m_queue.clear();
	}

	void Put(const T& t)
	{
		Add(t);
	}

	void Put(T&& t)
	{
		Add(std::move(t));
	}

	T Take()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty() || this->m_bStop;
			});

		if (m_bStop)
		{
			return T();
		}

		auto t = std::move(m_queue.front());
		m_queue.pop();
		return t;
	}

	auto TakeAll()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty() || this->m_bStop;
			});

		if (m_bStop)
		{
			return std::queue();
		}

		return std::move(m_queue);
	}

private:
	template
	void Add(T&& t)
	{
		{
			std::lock_guard lock(m_mutex);
			if (m_bStop)
			{
				return;
			}

			m_queue.push(std::forward(t));
		}

		m_condNotEmpty.notify_one();
	}

private:
	bool m_bStop;
	std::mutex m_mutex;
	std::queue m_queue;
	std::condition_variable m_condNotEmpty;
};