Task Queue 任務隊列 - 生產者與消費者

在開發看盤軟體時,時常會需要用到一種 Task Queue,可以將收到的任務存放起來,供稍後取出來處理,例如收到股價資料要依序處理。

又或者在觀察者模式中訂閱了某檔個股,而個股會有許多事件需要通知觀察者,例如歷史股價準備完成、即時股價到達或資料有誤需重抓等,觀察者可能來不及處理,需要先將事件存入 Task Queue 以便稍後處理。

另外不難想像 Task Queue 的兩端就像是生產者與消費者的關係,且可能同時存在多個生產者及多個消費者,因此 Task Queue 還必須是 Thread-safe。


任務隊列

以下為精簡後的 code,並且假設 Task Queue 存放的型別為 std::string,讓我們來看程式的關鍵部分,也就是 Put function 和 Take function 的寫法:


class CTaskQueue
{
public:
	void Put(const std::string& s)
	{
		{
			std::lock_guard lock(m_mutex);
			m_queue.push(s);
		}
		m_condNotEmpty.notify_one();
	}
	std::string Take()
	{
		std::unique_lock lock(m_mutex);
		m_condNotEmpty.wait(lock, [this]
			{
				return !this->m_queue.empty();
			});
		std::string s = m_queue.front();
		m_queue.pop();
		return s;
	}
private:
	std::mutex m_mutex;
	std::queue m_queue;
	std::condition_variable m_condNotEmpty;
};

首先來看 Take function,在 m_queue 為空的時候,消費者 thread 不能取得資料,因此有必要使用 std::condition_variable 根據 m_queue 是否為空的條件來暫停消費者 thread 執行,直到 m_queue 不為空時才被喚醒。注意這裡使用 std::condition_variable 的 wait 成員函式,當表達式為 true 時會繼續執行,因此要使用 !this->m_queue.empty() 來當作判斷式。

還有要注意的一點是,當消費者 thread 被暫停後,m_mutex 會被釋放,不然程式就會永遠卡住了,請各位暫停並思考一下此處奧妙,或繼續往下看也會有解答。

那麼由誰來喚醒消費者 thread 呢?接下來就將目光移到 Put function。因為需要 Thread-safe,所以 m_queue.push(s) 需要使用 std::mutex 來保護,這沒有問題,並且請注意,這裡的生產者 thread 是可以搶到 m_mutex 的,這也是為什麼 std::condition_variable 條件不滿足需要釋放 m_mutex,不釋放則程式會永遠卡住。

當生產者 thread 將資料放進 Task Queue 後,m_condNotEmpty.notify_one() 會喚醒一個暫停中的消費者 thread,注意此處 m_condNotEmpty.notify_one() 的呼叫是在 { … } 之外,不在外面的話,因為 m_mutex 尚未被 Put function 釋放,有可能 Take function 內被喚醒的消費者 thread 會搶不到 m_mutex,又會短暫進入暫停,程式執行效率會稍慢一些。

各位可以點下面影片觀看更多的說明:



留言