在開發看盤軟體時,時常會需要用到一種 Task Queue,可以將收到的任務存放起來,供稍後取出來處理,例如收到股價資料要依序處理。
又或者在觀察者模式中訂閱了某檔個股,而個股會有許多事件需要通知觀察者,例如歷史股價準備完成、即時股價到達或資料有誤需重抓等,觀察者可能來不及處理,需要先將事件存入 Task Queue 以便稍後處理。
另外不難想像 Task Queue 的兩端就像是生產者與消費者的關係,且可能同時存在多個生產者及多個消費者,因此 Task Queue 還必須是 Thread-safe。
以下為精簡後的 code,並且假設 Task Queue 存放的型別為 std::string,讓我們來看程式的關鍵部分,也就是 Put function 和 Take function 的寫法:
#include
#include
#include
#include
class CTaskQueue
{
public:
void Put(const std::string& s)
{
{
std::lock_guard lock(m_mutex);
m_queue.push(s);
}
m_condNotEmpty.notify_one();
}
std::string Take()
{
std::unique_lock lock(m_mutex);
m_condNotEmpty.wait(lock, [this]
{
return !this->m_queue.empty();
});
std::string s = m_queue.front();
m_queue.pop();
return s;
}
private:
std::mutex m_mutex;
std::queue m_queue;
std::condition_variable m_condNotEmpty;
};
首先來看 Take function,在 m_queue 為空的時候,消費者 thread 不能取得資料,因此有必要使用 std::condition_variable 根據 m_queue 是否為空的條件來暫停消費者 thread 執行,直到 m_queue 不為空時才被喚醒。注意這裡使用 std::condition_variable 的 wait 成員函式,當表達式為 true 時會繼續執行,因此要使用 !this->m_queue.empty() 來當作判斷式。
還有要注意的一點是,當消費者 thread 被暫停後,m_mutex 會被釋放,不然程式就會永遠卡住了,請各位暫停並思考一下此處奧妙,或繼續往下看也會有解答。
那麼由誰來喚醒消費者 thread 呢?接下來就將目光移到 Put function。因為需要 Thread-safe,所以 m_queue.push(s) 需要使用 std::mutex 來保護,這沒有問題,並且請注意,這裡的生產者 thread 是可以搶到 m_mutex 的,這也是為什麼 std::condition_variable 條件不滿足需要釋放 m_mutex,不釋放則程式會永遠卡住。
當生產者 thread 將資料放進 Task Queue 後,m_condNotEmpty.notify_one() 會喚醒一個暫停中的消費者 thread,注意此處 m_condNotEmpty.notify_one() 的呼叫是在 { … } 之外,不在外面的話,因為 m_mutex 尚未被 Put function 釋放,有可能 Take function 內被喚醒的消費者 thread 會搶不到 m_mutex,又會短暫進入暫停,程式執行效率會稍慢一些。
好了,這就是全部了,謝謝各位。以下附上完整的程式碼,完整的程式碼複雜許多,未來再細細解釋吧!
#pragma once
#include
#include
#include
template
class CTaskQueue final
{
public:
CTaskQueue() : m_bStop(true), m_mutex(), m_queue(), m_condNotEmpty() {}
CTaskQueue(const CTaskQueue&) = delete;
CTaskQueue(CTaskQueue&&) = delete;
CTaskQueue& operator=(const CTaskQueue&) = delete;
CTaskQueue& operator=(CTaskQueue&&) = delete;
~CTaskQueue()
{
Stop();
}
public:
void Start()
{
std::lock_guard lock(m_mutex);
m_bStop = false;
}
void Stop()
{
{
std::lock_guard lock(m_mutex);
m_bStop = true;
}
m_condNotEmpty.notify_all();
}
void Clear()
{
std::lock_guard lock(m_mutex);
m_queue.clear();
}
void Put(const T& t)
{
Add(t);
}
void Put(T&& t)
{
Add(std::move(t));
}
T Take()
{
std::unique_lock lock(m_mutex);
m_condNotEmpty.wait(lock, [this]
{
return !this->m_queue.empty() || this->m_bStop;
});
if (m_bStop)
{
return T();
}
auto t = std::move(m_queue.front());
m_queue.pop();
return t;
}
auto TakeAll()
{
std::unique_lock lock(m_mutex);
m_condNotEmpty.wait(lock, [this]
{
return !this->m_queue.empty() || this->m_bStop;
});
if (m_bStop)
{
return std::queue();
}
return std::move(m_queue);
}
private:
template
void Add(T&& t)
{
{
std::lock_guard lock(m_mutex);
if (m_bStop)
{
return;
}
m_queue.push(std::forward(t));
}
m_condNotEmpty.notify_one();
}
private:
bool m_bStop;
std::mutex m_mutex;
std::queue m_queue;
std::condition_variable m_condNotEmpty;
};