现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

信号量和互斥量的使用实例

2012-08-27 06:57 工业·编程 ⁄ 共 6998字 ⁄ 字号 暂无评论

/******************************************************************************
Module:  Queue.cpp
Notices: Copyright (c) 2008 Jeffrey Richter & Christophe Nasarre
******************************************************************************/

#include "..\CommonFiles\CmnHdr.h"     /* See Appendix A. */
#include <windowsx.h>
#include <tchar.h>
#include <StrSafe.h>
#include "Resource.h"

///////////////////////////////////////////////////////////////////////////////

class CQueue {
public:
   struct ELEMENT {
      int m_nThreadNum, m_nRequestNum;
      // Other element data should go here
   };
   typedef ELEMENT* PELEMENT;

private:
   PELEMENT m_pElements;        // Array of elements to be processed
   int      m_nMaxElements;     // Maximum # of elements in the array
   HANDLE   m_h[2];             // Mutex & semaphore handles
   HANDLE   &m_hmtxQ;           // Reference to m_h[0]
   HANDLE   &m_hsemNumElements; // Reference to m_h[1]

public:
   CQueue(int nMaxElements);
   ~CQueue();

   BOOL Append(PELEMENT pElement, DWORD dwMilliseconds);
   BOOL Remove(PELEMENT pElement, DWORD dwMilliseconds);
};

///////////////////////////////////////////////////////////////////////////////

CQueue::CQueue(int nMaxElements)
   : m_hmtxQ(m_h[0]), m_hsemNumElements(m_h[1]) {

   m_pElements = (PELEMENT)
      HeapAlloc(GetProcessHeap(), 0, sizeof(ELEMENT) * nMaxElements);
   m_nMaxElements = nMaxElements;
   m_hmtxQ = CreateMutex(NULL, FALSE, NULL);
   m_hsemNumElements = CreateSemaphore(NULL, 0, nMaxElements, NULL);
}

///////////////////////////////////////////////////////////////////////////////

CQueue::~CQueue() {

   CloseHandle(m_hsemNumElements);
   CloseHandle(m_hmtxQ);
   HeapFree(GetProcessHeap(), 0, m_pElements);
}

///////////////////////////////////////////////////////////////////////////////

BOOL CQueue::Append(PELEMENT pElement, DWORD dwTimeout) {

   BOOL fOk = FALSE;
   DWORD dw = WaitForSingleObject(m_hmtxQ, dwTimeout);

   if (dw == WAIT_OBJECT_0) {
      // This thread has exclusive access to the queue

      // Increment the number of elements in the queue
      LONG lPrevCount;
      fOk = ReleaseSemaphore(m_hsemNumElements, 1, &lPrevCount);
      if (fOk) {
         // The queue is not full, append the new element
         m_pElements[lPrevCount] = *pElement;
      } else {

         // The queue is full, set the error code and return failure
         SetLastError(ERROR_DATABASE_FULL);
      }

      // Allow other threads to access the queue
      ReleaseMutex(m_hmtxQ);

   } else {
      // Timeout, set error code and return failure
      SetLastError(ERROR_TIMEOUT);
   }

   return(fOk);   // Call GetLastError for more info
}

///////////////////////////////////////////////////////////////////////////////

BOOL CQueue::Remove(PELEMENT pElement, DWORD dwTimeout) {

   // Wait for exclusive access to queue and for queue to have element.
   BOOL fOk = (WaitForMultipleObjects(_countof(m_h), m_h, TRUE, dwTimeout)
      == WAIT_OBJECT_0);

   if (fOk) {
      // The queue has an element, pull it from the queue
      *pElement = m_pElements[0];

      // Shift the remaining elements down
      MoveMemory(&m_pElements[0], &m_pElements[1],
         sizeof(ELEMENT) * (m_nMaxElements - 1));

      // Allow other threads to access the queue
      ReleaseMutex(m_hmtxQ);

   } else {
      // Timeout, set error code and return failure
      SetLastError(ERROR_TIMEOUT);
   }

   return(fOk);   // Call GetLastError for more info
}

///////////////////////////////////////////////////////////////////////////////

CQueue g_q(10);                     // The shared queue
volatile LONG g_fShutdown = FALSE;  // Signals client/server threads to die
HWND g_hwnd;                        // How client/server threads give status

// Handles to all client/server threads & number of client/server threads
HANDLE g_hThreads[MAXIMUM_WAIT_OBJECTS]; 
int    g_nNumThreads = 0;

///////////////////////////////////////////////////////////////////////////////

DWORD WINAPI ClientThread(PVOID pvParam) {

   int nThreadNum = PtrToUlong(pvParam);
   HWND hwndLB = GetDlgItem(g_hwnd, IDC_CLIENTS);

   int nRequestNum = 0;
   while (1 != InterlockedCompareExchange(&g_fShutdown, 0, 0)) {

      // Keep track of the current processed element
      nRequestNum++;
      TCHAR sz[1024];
      CQueue::ELEMENT e = { nThreadNum, nRequestNum };

      // Try to put an element on the queue
      if (g_q.Append(&e, 200)) {

         // Indicate which thread sent it and which request
         StringCchPrintf(sz, _countof(sz), TEXT("Sending %d:%d"),
            nThreadNum, nRequestNum);
      } else {

         // Couldn't put an element on the queue
         StringCchPrintf(sz, _countof(sz), TEXT("Sending %d:%d (%s)"),
            nThreadNum, nRequestNum, (GetLastError() == ERROR_TIMEOUT)
               ? TEXT("timeout") : TEXT("full"));
      }

      // Show result of appending element
      ListBox_SetCurSel(hwndLB, ListBox_AddString(hwndLB, sz));
      Sleep(2500);   // Wait before appending another element
   }
   return(0);
}

///////////////////////////////////////////////////////////////////////////////

DWORD WINAPI ServerThread(PVOID pvParam) {

   int nThreadNum = PtrToUlong(pvParam);
   HWND hwndLB = GetDlgItem(g_hwnd, IDC_SERVERS);

   while (1 != InterlockedCompareExchange(&g_fShutdown, 0, 0)) {

      TCHAR sz[1024];
      CQueue::ELEMENT e;

      // Try to get an element from the queue
      if (g_q.Remove(&e, 5000)) {

         // Indicate which thread is processing it, which thread
         // sent it and which request we're processing
         StringCchPrintf(sz, _countof(sz), TEXT("%d: Processing %d:%d"),
            nThreadNum, e.m_nThreadNum, e.m_nRequestNum);

         // The server takes some time to process the request
         Sleep(2000 * e.m_nThreadNum);

      } else {
         // Couldn't get an element from the queue
         StringCchPrintf(sz, _countof(sz), TEXT("%d: (timeout)"), nThreadNum);
      }

      // Show result of processing element
      ListBox_SetCurSel(hwndLB, ListBox_AddString(hwndLB, sz));
   }

   return(0);
}

///////////////////////////////////////////////////////////////////////////////

BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam) {

   chSETDLGICONS(hwnd, IDI_QUEUE);

   g_hwnd = hwnd; // Used by client/server threads to show status

   DWORD dwThreadID;

   // Create the client threads
   for (int x = 0; x < 4; x++)
      g_hThreads[g_nNumThreads++] =
         chBEGINTHREADEX(NULL, 0, ClientThread, (PVOID) (INT_PTR) x,
            0, &dwThreadID);

   // Create the server threads
   for (int x = 0; x < 2; x++)
      g_hThreads[g_nNumThreads++] =
         chBEGINTHREADEX(NULL, 0, ServerThread, (PVOID) (INT_PTR) x,
            0, &dwThreadID);

   return(TRUE);
}

///////////////////////////////////////////////////////////////////////////////

void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtl, UINT codeNotify) {

   switch (id) {
      case IDCANCEL:
         EndDialog(hwnd, id);
         break;
   }
}

///////////////////////////////////////////////////////////////////////////////

INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam) {
   switch (uMsg) {
      chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
      chHANDLE_DLGMSG(hwnd, WM_COMMAND,    Dlg_OnCommand);
   }
   return(FALSE);
}

///////////////////////////////////////////////////////////////////////////////

int WINAPI _tWinMain(HINSTANCE hinstExe, HINSTANCE, PTSTR pszCmdLine, int) {

   DialogBox(hinstExe, MAKEINTRESOURCE(IDD_QUEUE), NULL, Dlg_Proc);
   InterlockedExchange(&g_fShutdown, TRUE);

   // Wait for all the threads to terminate & then cleanup
   WaitForMultipleObjects(g_nNumThreads, g_hThreads, TRUE, INFINITE);
   while (g_nNumThreads--)
      CloseHandle(g_hThreads[g_nNumThreads]);

   return(0);
}

//////////////////////////////// End of File //////////////////////////////////

给我留言

留言无头像?