觀察者模式 (Observer pattern) 1

開發看盤軟體一定會用到的設計模式 (Design Pattern)。

Observer pattern

Wiki 的介紹,本文章從省略 Observer 及 Subject 抽象基礎類別 (Abstract Class) 開始講起,一步步推導出觀察者模式,此模式是一種一對多的依賴關係。

開發看盤軟體時,會需要觀看股票,例如想看台積電 (2330.TW) 的資料,因此有必要使用訂閱 – 等待資料更新通知的設計方式。其中主題類別是被觀察者訂閱的,也就是範例中台積電這個主題 (CTSMCStockSubject ),假設現在陽春版的看盤軟體只有一種頁面,也就是只有一個觀察者,那麼確實可以省略 Observer 及 Subject 抽象基礎類別 (Abstract Base Class) 。

P.S. 這裡需要注意,股價資料在盤中是會不斷的更新並通知,如果只需要一次性通訊,例如可能是等待歷史資料完成,那麼使用 std::future 更為適合。

觀察者類別 (CObserver),有一個等待資料更新的函式 (Update),這是會被主題類別呼叫的函式,參數可能會因為需求傳入某種資料格式或是主題類別的指標,稍後再用此指標去取得資料。

class CObserver final // 台積電資料的觀察者
{
public:
	void Update(CTSMCStockSubject* pSubject)
	{}
};

主題類別 (CTSMCStockSubject) 有三個函式,訂閱 (Subscribe)、取消訂閱 (Unsubscribe) 及通知觀察者們 (Notify)。

觀察者訂閱主題時,將自己傳進主題類別內 void Subscribe(const std::shared_ptr<CObserver>& ob),取消訂閱亦然。

class CTSMCStockSubject final // 台積電資料的主題
{
public:
	void Subscribe(const std::shared_ptr<CObserver>& ob)
	{
		m_observers.emplace(ob);
	}

	void Unsubscribe(const std::shared_ptr<CObserver>& ob)
	{
		m_observers.erase(ob);
	}

	void Notify()
	{
		for (auto& ob : m_observers)
		{
			ob->Update(this);
		}
	}

private:
	std::set<std::shared_ptr<CObserver>> m_observers;
};

我們發現紀錄主題類別的觀察者成員變數是使用 std::set,而且元素是 std::shared_ptr<…>,這是為了避免重複訂閱及提供取消訂閱的簡單實作。

std::set 因為元素即 key,因此可以避免重複訂閱。

使用 std::set (O(log N)) 比起 std::vector (O(N)) 可以更快速的遍歷觀察者們,而使用 std::shared_ptr 是為了讓觀察者可以被比較,找到要取消訂閱的元素。

int main()
{
	auto ob = std::make_shared<CObserver>();

	CTSMCStockSubject subject;
	subject.Subscribe(ob);
	subject.Notify();
	subject.Unsubscribe(ob);
	return 0;
}

C++ 17 的 Read/Write Lock 使用方法

開發看盤軟體時,資料通常是讀取的頻率大於寫入的頻率,例如收到一筆股價 Tick,寫入資料結構一次,然後在多種圖形顯示,也就是讀取多次。

在 C++ 17 之前,要使用 Read/Write Lock 只能自己實現或使用系統提供的 API,但現在情況有所改變,以下就是 C++ 17 讀寫鎖的使用方法:

#include <shared_mutex>

std::shared_mutex g_mutex;

Write Lock 這樣寫

std::unique_lock<std::shared_mutex> wLock(g_mutex);

Read Lock 這樣寫

std::shared_lock<std::shared_mutex> rLock(g_mutex);

可以發現 C++ 17 Read/Write lock 的使用就是這麼平易近人且直觀。

但有一個地方要注意,一般我們都是在成員函式裡使用 std::shared_mutex,所以會將它宣告為成員變數,另外我們會用 Set function 來改變成員變數,Get function 讀取成員變數,所以會宣告 Get function 為 const 成員函式,例子如下:

#include <shared_mutex>

class CStockInfo
{
public:
	void SetPrice(double dPrice)
	{
		std::unique_lock<std::shared_mutex> wLock(m_mutex);
		m_dPrice = dPrice;
	}

	double GetPrice() const
	{
		std::shared_lock<std::shared_mutex> rLock(m_mutex);
		return m_dPrice;
	}

private:
	double m_dPrice;
	std::shared_mutex m_mutex;
};

但此時編譯程式會發現編譯器在 GetPrice 報錯,因為成員函式宣告為 const,此時不明就裡的程式設計師會乾脆將 const 拿掉,這是錯誤的,正確的做法是在宣告 std::shared_mutex 變數前加上 mutable,如此就告訴編譯器它是邏輯上的常量了。

#include <shared_mutex>

class CStockInfo
{
...
private:
	double m_dPrice;
	mutable std::shared_mutex m_mutex; // 宣告為 mutable

Task Queue 任務隊列 – 生產者與消費者

在開發看盤軟體時,時常會需要用到一種 Task Queue,可以將收到的任務存放起來,供稍後取出來處理,例如收到股價資料要依序處理。

又或者在觀察者模式中訂閱了某檔個股,而個股會有許多事件需要通知觀察者,例如歷史股價準備完成、即時股價到達或資料有誤需重抓等,觀察者可能來不及處理,需要先將事件存入 Task Queue 以便稍後處理。

另外不難想像 Task Queue 的兩端就像是生產者與消費者的關係,且可能同時存在多個生產者及多個消費者,因此 Task Queue 還必須是 Thread-safe。

Task Queue
任務隊列

以下為精簡後的 code,並且假設 Task Queue 存放的型別為 std::string,讓我們來看程式的關鍵部分,也就是 Put function 和 Take function 的寫法:

#include 
#include 
#include 
#include 

class CTaskQueue
{
public:
	void Put(const std::string& s)
	{
		{
			std::lock_guard lock(m_mutex);
			m_queue.push(s);
		}

		m_condNotEmpty.notify_one();
	}

	std::string Take()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty();
			});

		std::string s = m_queue.front();
		m_queue.pop();
		return s;
	}

private:
	std::mutex m_mutex;
	std::queue m_queue;
	std::condition_variable m_condNotEmpty;
};

首先來看 Take function,在 m_queue 為空的時候,消費者 thread 不能取得資料,因此有必要使用 std::condition_variable 根據 m_queue 是否為空的條件來暫停消費者 thread 執行,直到 m_queue 不為空時才被喚醒。注意這裡使用 std::condition_variable 的 wait 成員函式,當表達式為 true 時會繼續執行,因此要使用 !this->m_queue.empty() 來當作判斷式。

還有要注意的一點是,當消費者 thread 被暫停後,m_mutex 會被釋放,不然程式就會永遠卡住了,請各位暫停並思考一下此處奧妙,或繼續往下看也會有解答。

那麼由誰來喚醒消費者 thread 呢?接下來就將目光移到 Put function。因為需要 Thread-safe,所以 m_queue.push(s) 需要使用 std::mutex 來保護,這沒有問題,並且請注意,這裡的生產者 thread 是可以搶到 m_mutex 的,這也是為什麼 std::condition_variable 條件不滿足需要釋放 m_mutex,不釋放則程式會永遠卡住。

當生產者 thread 將資料放進 Task Queue 後,m_condNotEmpty.notify_one() 會喚醒一個暫停中的消費者 thread,注意此處 m_condNotEmpty.notify_one() 的呼叫是在 { … } 之外,不在外面的話,因為 m_mutex 尚未被 Put function 釋放,有可能 Take function 內被喚醒的消費者 thread 會搶不到 m_mutex,又會短暫進入暫停,程式執行效率會稍慢一些。

好了,這就是全部了,謝謝各位。以下附上完整的程式碼,完整的程式碼複雜許多,未來再細細解釋吧!

#pragma once

#include 
#include 
#include 

template
class CTaskQueue final
{
public:
	CTaskQueue() : m_bStop(true), m_mutex(), m_queue(), m_condNotEmpty() {}
	CTaskQueue(const CTaskQueue&) = delete;
	CTaskQueue(CTaskQueue&&) = delete;
	CTaskQueue& operator=(const CTaskQueue&) = delete;
	CTaskQueue& operator=(CTaskQueue&&) = delete;
	~CTaskQueue()
	{
		Stop();
	}

public:
	void Start()
	{
		std::lock_guard lock(m_mutex);
		m_bStop = false;
	}

	void Stop()
	{
		{
			std::lock_guard lock(m_mutex);
			m_bStop = true;
		}

		m_condNotEmpty.notify_all();
	}

	void Clear()
	{
		std::lock_guard lock(m_mutex);
		m_queue.clear();
	}

	void Put(const T& t)
	{
		Add(t);
	}

	void Put(T&& t)
	{
		Add(std::move(t));
	}

	T Take()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty() || this->m_bStop;
			});

		if (m_bStop)
		{
			return T();
		}

		auto t = std::move(m_queue.front());
		m_queue.pop();
		return t;
	}

	auto TakeAll()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty() || this->m_bStop;
			});

		if (m_bStop)
		{
			return std::queue();
		}

		return std::move(m_queue);
	}

private:
	template
	void Add(T&& t)
	{
		{
			std::lock_guard lock(m_mutex);
			if (m_bStop)
			{
				return;
			}

			m_queue.push(std::forward(t));
		}

		m_condNotEmpty.notify_one();
	}

private:
	bool m_bStop;
	std::mutex m_mutex;
	std::queue m_queue;
	std::condition_variable m_condNotEmpty;
};