好久沒有寫blog囉
冷了一個月了..T_T
前一陣子比趨勢
最近又在打混
唯一有做事情的大概就是研究一下windows platform sdk吧
一個我好久沒有接觸的東西..
因為需要所以實作一個blocking queue
也就是應用producer-consumer這個pattern
沒想到java那麼簡單可以做到的事情跑到windows上就變成那麼困難
我定義了一個BlockingQueue的class
裡面有三個method
enqueue dequeue peek
enqueue是塞資料到queue
dequeue跟peek都是取資料
前者是blocking等到有資料
後者是不管有沒有都會馬上回傳
定義如下
//queue.h
#ifndef queue_h
#define queue_h
#include <windows.h>
#include <queue>
#include <assert.h>
using namespace std;
typedef struct _QueueItem{
int x;
int y;
} QueueItem;
class CBlockingQueue{
private:
queue<QueueItem> m_queue;
HANDLE m_mtx;
HANDLE m_evtAvail;
public:
CBlockingQueue();
virtual ~CBlockingQueue();
void enqueue(const QueueItem&);
BOOL dequeue(QueueItem*);
BOOL peek(QueueItem*);
};
#endif
實作的關鍵在於使用mutex跟event
mutex是一個物件 他只允許同時只能有一個thread擁有這個物件
它透過CreateMutex來產生
WaitForSingleObject或WaitForMultipleObjects來取得這個mutex
而ReleaseMutex來釋放
透過他我門能夠保證我們放入跟取出queue的動作是同時只有一個thread在執行
但是這還不夠
在producer-consumer pattern中
當consumer等不到東西時(queue size = 0)
會靜靜得等producer放東西進來的事件發生
這時候我們就要使用event這個物件
event的狀態可能是set或是unset
我門可以用SetEvent跟ResetEvent來設定他的狀態
同樣的 我門可以使用WaitForSingleObject或WaitForMultipleObjects
來等待這個event被set
所以在以下的程式裡
當enqueue放東西進來時
我門就把m_evtAvail這個event設為set
代表queue裡還有東西
而dequeue時就可以等待m_evtAvail設為set
則透過peek這個method去取得資料
值得注意的是...
在dequeue當中
當event被設為set並不保證peek一定會取得資料
因為可能在此同時也有另一個consumer也取得event被set的通知
而搶先一步去取得資料
所以我們在enqueue中有一個while loop
來保證真的取得資料時候才真正回傳
以下是這部分實作的程式碼
//queue.cpp
#include "stdafx.h"
#include "queue.h"
CBlockingQueue::CBlockingQueue(){
m_mtx = ::CreateMutex(NULL, FALSE, NULL);
assert(m_mtx != NULL);
m_evtAvail = ::CreateEvent(NULL, TRUE, FALSE, NULL);
assert(m_evtAvail != NULL);
}
CBlockingQueue::~CBlockingQueue(){
if (NULL != m_mtx)
::CloseHandle(m_mtx);
if (NULL != m_evtAvail)
::CloseHandle(m_evtAvail);
}
void CBlockingQueue::enqueue(const QueueItem& item){
DWORD dwWaitResult;
//acquire mutex
dwWaitResult = WaitForSingleObject(m_mtx, INFINITE);
m_queue.push(item);
::SetEvent(m_evtAvail);
//release mutex
ReleaseMutex(m_mtx);
}
BOOL CBlockingQueue::dequeue(QueueItem *item){
DWORD dwWaitResult;
BOOL timeout = FALSE;
BOOL done = FALSE;
while(!done && !timeout){
//printf("dequeue:wait event\n");
dwWaitResult = ::WaitForSingleObject(m_evtAvail, INFINITE);
//
switch(dwWaitResult){
case WAIT_OBJECT_0:
done = peek(item);
break;
case WAIT_TIMEOUT:
timeout = TRUE;
printf("timeout event\n");
break;
case WAIT_ABANDONED:
timeout = TRUE;
printf("abandoned event\n");
break;
default:
;
}
}
return done;
}
BOOL CBlockingQueue::peek(QueueItem *item){
DWORD dwWaitResult;
BOOL done = false;
dwWaitResult = WaitForSingleObject(m_mtx, INFINITE);
switch(dwWaitResult){
case WAIT_OBJECT_0:
if(m_queue.size() > 0){
*item = m_queue.front();
m_queue.pop();
done = TRUE;
}else{
::ResetEvent(m_evtAvail);
}
break;
case WAIT_TIMEOUT:
printf("timeout mutex\n");
break;
case WAIT_ABANDONED:
printf("abandoned mutex\n");
break;
default:
;
}
ReleaseMutex(m_mtx);
return done;
}
記得在visual studio中編譯時
把Runtime程式庫這邊改成multithread的選項(多執行緒或多執行緒偵錯)
它會把/ML(/MLd)的參數改成/MT(/MTd)
這樣一些c的library才會使用正確.. 如printf, strtok等
以下是一個範例的main
// BlockingQueue.cpp
#include "stdafx.h"
#include "queue.h"
CBlockingQueue g_queue;
DWORD WINAPI ProducerFunc(LPVOID lpParameter){
char* name = (char*)lpParameter;
QueueItem item;
for(int i=1; i<100; i++){
item.x = 0; item.y = i;
g_queue.enqueue(item);
printf("[%s](%d,%d)\n", name, item.x, item.y);
::Sleep(rand() % 1000);
}
return 0;
}
DWORD WINAPI ConsumerFunc(LPVOID lpParameter){
char* name = (char*)lpParameter;
QueueItem item;
BOOL done;
while(TRUE){
item.x = 0; item.y = 0;
done = g_queue.dequeue(&item);
//if(done){
printf("[%s](%d,%d)\n", name, item.x, item.y);
//}
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
srand( (unsigned)time( NULL ) );
//create producers
DWORD dwThreadId;
HANDLE hThread;
hThread = CreateThread(NULL, 0, ProducerFunc, "producer", 0, &dwThreadId);
hThread = CreateThread(NULL, 0, ConsumerFunc, "consumer", 0, &dwThreadId);
::Sleep(1000000000);
return 0;
}
Posted by popcorny at August 2, 2004 02:11 PM
| TrackBack