August 02, 2004

Producer-Consumer in windows platform sdk

好久沒有寫blog囉
冷了一個月了..T_T
前一陣子比趨勢
最近又在打混
唯一有做事情的大概就是研究一下windows platform sdk吧
一個我好久沒有接觸的東西..

因為需要所以實作一個blocking queue
也就是應用producer-consumer這個pattern
沒想到java那麼簡單可以做到的事情跑到windows上就變成那麼困難

我定義了一個BlockingQueue的class
裡面有三個method
enqueue dequeue peek
enqueue是塞資料到queue
dequeue跟peek都是取資料
前者是blocking等到有資料
後者是不管有沒有都會馬上回傳
定義如下

//queue.h
#ifndef queue_h
#define queue_h

#include <windows.h>
#include <queue>
#include <assert.h>
using namespace std;

typedef struct _QueueItem{
	int x;
	int y;
} QueueItem;

class CBlockingQueue{
private:
	queue<QueueItem> m_queue;
	HANDLE m_mtx;
	HANDLE m_evtAvail;
public:
	CBlockingQueue();
	virtual ~CBlockingQueue();
	void enqueue(const QueueItem&);
	BOOL dequeue(QueueItem*);
	BOOL peek(QueueItem*);
};

#endif


實作的關鍵在於使用mutex跟event
mutex是一個物件 他只允許同時只能有一個thread擁有這個物件
它透過CreateMutex來產生
WaitForSingleObject或WaitForMultipleObjects來取得這個mutex
而ReleaseMutex來釋放
透過他我門能夠保證我們放入跟取出queue的動作是同時只有一個thread在執行

但是這還不夠
在producer-consumer pattern中
當consumer等不到東西時(queue size = 0)
會靜靜得等producer放東西進來的事件發生
這時候我們就要使用event這個物件
event的狀態可能是set或是unset
我門可以用SetEvent跟ResetEvent來設定他的狀態
同樣的 我門可以使用WaitForSingleObject或WaitForMultipleObjects
來等待這個event被set

所以在以下的程式裡
當enqueue放東西進來時
我門就把m_evtAvail這個event設為set
代表queue裡還有東西
而dequeue時就可以等待m_evtAvail設為set
則透過peek這個method去取得資料

值得注意的是...
在dequeue當中
當event被設為set並不保證peek一定會取得資料
因為可能在此同時也有另一個consumer也取得event被set的通知
而搶先一步去取得資料
所以我們在enqueue中有一個while loop
來保證真的取得資料時候才真正回傳

以下是這部分實作的程式碼

//queue.cpp
#include "stdafx.h"
#include "queue.h"

CBlockingQueue::CBlockingQueue(){		
	m_mtx = ::CreateMutex(NULL, FALSE, NULL);
	assert(m_mtx != NULL);
	m_evtAvail = ::CreateEvent(NULL, TRUE, FALSE, NULL);
	assert(m_evtAvail != NULL);
}
CBlockingQueue::~CBlockingQueue(){
	if (NULL != m_mtx)
		::CloseHandle(m_mtx);
	if (NULL != m_evtAvail)
		::CloseHandle(m_evtAvail);
}
void CBlockingQueue::enqueue(const QueueItem& item){
	DWORD dwWaitResult; 
	//acquire mutex
	dwWaitResult = WaitForSingleObject(m_mtx, INFINITE);   
	m_queue.push(item);
	::SetEvent(m_evtAvail);
	//release mutex
	ReleaseMutex(m_mtx);        
}
BOOL CBlockingQueue::dequeue(QueueItem *item){
	DWORD dwWaitResult; 
	BOOL timeout = FALSE;
	BOOL done = FALSE;
	while(!done && !timeout){
		//printf("dequeue:wait event\n");
		dwWaitResult = ::WaitForSingleObject(m_evtAvail, INFINITE);
		//
		switch(dwWaitResult){
		case WAIT_OBJECT_0:
			done = peek(item);
			break;
		case WAIT_TIMEOUT:
			timeout = TRUE;
			printf("timeout event\n");
			break;
		case WAIT_ABANDONED:
			timeout = TRUE;
			printf("abandoned event\n");
			break;
		default:
			;
		}
	}
	return done;
}
BOOL CBlockingQueue::peek(QueueItem *item){
	DWORD dwWaitResult; 
	BOOL done = false;
	dwWaitResult = WaitForSingleObject(m_mtx, INFINITE);   
	switch(dwWaitResult){
		case WAIT_OBJECT_0:
			if(m_queue.size() > 0){
				*item = m_queue.front();				
				m_queue.pop();
				done = TRUE;
			}else{
				::ResetEvent(m_evtAvail);
			}
			break;
		case WAIT_TIMEOUT:
			printf("timeout mutex\n");
			break;
		case WAIT_ABANDONED:
			printf("abandoned mutex\n");
			break;
		default:
			;
	}
	ReleaseMutex(m_mtx);   
	return done;
}

記得在visual studio中編譯時
把Runtime程式庫這邊改成multithread的選項(多執行緒或多執行緒偵錯)
它會把/ML(/MLd)的參數改成/MT(/MTd)
這樣一些c的library才會使用正確.. 如printf, strtok等
以下是一個範例的main

// BlockingQueue.cpp

#include "stdafx.h"
#include "queue.h"

CBlockingQueue g_queue;

DWORD WINAPI ProducerFunc(LPVOID lpParameter){
	char* name = (char*)lpParameter;
	QueueItem item;
	for(int i=1; i<100; i++){
		item.x = 0; item.y = i;
		g_queue.enqueue(item);
		printf("[%s](%d,%d)\n", name, item.x, item.y);
		::Sleep(rand() % 1000);
	}
	return 0;
}

DWORD WINAPI ConsumerFunc(LPVOID lpParameter){
	char* name = (char*)lpParameter;
	QueueItem item;
	BOOL done;
	while(TRUE){		
		item.x = 0; item.y = 0;
		done = g_queue.dequeue(&item);
		//if(done){
			printf("[%s](%d,%d)\n", name, item.x, item.y);
		//}

	}
	return 0;
}


int _tmain(int argc, _TCHAR* argv[])
{
	srand( (unsigned)time( NULL ) );
	//create producers
	DWORD dwThreadId;
	HANDLE hThread; 
	
	hThread = CreateThread(NULL, 0,	ProducerFunc, "producer", 0, &dwThreadId); 
	hThread = CreateThread(NULL, 0,	ConsumerFunc, "consumer", 0, &dwThreadId); 

	::Sleep(1000000000);
	return 0;
}

Posted by popcorny at August 2, 2004 02:11 PM | TrackBack
Comments
Post a comment









Remember personal info?