现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

Win32下两种用于C++的线程同步类

2012-07-17 06:57 工业·编程 ⁄ 共 10257字 ⁄ 字号 暂无评论

线程同步是多线程程序设计的核心内容,它的目的是正确处理多线程并发时的各种问题,例如线程的等待、多个线程访问同一数据时的互斥,防死锁等。Win32提供多种内核对象和手段用于线程同步,如互斥量、信号量、事件、临界区等。所不同的是,互斥量、信号量、事件都是Windows的内核对象,当程序对这些对象进行控制时会自动转换到核心态,而临界区本身不是内核对象,它是工作在用户态的。我们知道从用户态转换到核心态是需要以时间为代价的,所以如果能在用户态就简单解决的问题,就可以不必劳烦核心态了。
这里我要说的是两种用于C++的多线程同步类,通过对这两种类的使用就可以方便的实现对变量或代码段的加锁控制,从而防止多线程对变量不正确的操作。
所谓加锁,就是说当我们要访问某关键变量之前,都需要首先获得允许才能继续,如果未获得允许则只有等待。一个关键变量拥有一把锁,一个线程必须先得到这把锁(其实称为钥匙可能更形象)才可以访问这个变量,而当某个变量持有这把锁的时候,其他线程就不能重复的得到它,只有等持有锁的线程把锁归还以后其他线程才有可能得到它。之所以这样做,就是为了防止一个线程读取某对象途中另一线程对它进行了修改,或两线程同时对一变量进行修改,例如:

// 全局:
struct MyStruct ... { int a, b; } ;
MyStruct s;
// 线程1:
int a = s.a;
int b = s.b;
// 线程2:
s.a ++ ;
s.b -- ;

如果实际的执行顺序就是上述书写的顺序那到没有什么,但如果线程2的执行打断了线程1,变为如下顺序:

int a = s.a; //线程1
s.a++; //线程2
s.b++; //线程2
int b = s.b; //线程1

那么这时线程1读出来的a和b就会有问题了,因为a是在修改前读的,而b是在修改后读的,这样读出来的是不完整的数据,会对程序带来不可预料的后果。天知道两个程的调度顺序是什么样的。为了防止这种情况的出现,需要对变量s加锁,也就是当线程1得到锁以后就可以放心的访问s,这时如果线程2要修改s,只有等线程1访问完成以后将锁释放才可以,从而保证了上述两线程交叉访问变量的情况不会出现。
使用Win32提供的临界区可以方便的实现这种锁:

// 全局:
CRITICAL_SECTION cs;
InitializeCriticalSection( & cs);
// 线程1:
EnterCriticalSection( & cs);
int a = s.a;
int b = s.b;
LeaveCriticalSection( & cs);
// 线程2:
EnterCriticalSection( & cs);
s.a ++ ;
s.b -- ;
LeaveCriticalSection( & cs);
// 最后:
DeleteCriticalSection( & cs);

代码中的临界区变量(cs)就可以看作是变量s的锁,当函数EnterCriticalSection返回时,当前线程就获得了这把锁,之后就是对变量的访问了。访问完成后,调用LeaveCriticalSection表示释放这把锁,允许其他线程继续使用它。
如果每当需要对一个变量进行加锁时都需要做这些操作,显得有些麻烦,而且变量cs与s只有逻辑上的锁关系,在语法上没有什么联系,这对于锁的管理带来了不小的麻烦。程序员总是最懒的,可以想出各种偷懒的办法来解决问题,例如让被锁的变量与加锁的变量形成物理上的联系,使得锁变量成为被锁变量不可分割的一部分,这听起来是个好主意。
首先想到的是把锁封闭在一个类里,让类的构造函数和析构函数来管理对锁的初始化和锁毁动作,我们称这个锁为“实例锁”:

class InstanceLockBase
... {
CRITICAL_SECTION cs;
protected :
InstanceLockBase() ... { InitialCriticalSection( & cs); }
~ InstanceLockBase() ... { DeleteCriticalSection( & cs); }
} ;

如果熟悉C++,看到这里一定知道后面我要干什么了,对了,就是继承,因为我把构造函数和析构函数都声明为保护的(protected),这样唯一的作用就是在子类里使用它。让我们的被保护数据从这个类继承,那么它们不就不可分割了吗:

struct MyStruct: public InstanceLockBase
... { … } ;

什么?结构体还能从类继承?当然,C++中结构体和类除了成员的默认访问控制不同外没有什么不一样,class能做的struct也能做。此外,也许你还会问,如果被锁的是个简单类型,不能继承怎么办,那么要么用一个类对这个简单类型进行封装(记得Java里有int和Integer吗),要么只好手工管理它们的联系了。如果被锁类已经有了基类呢?没关系,C++是允许多继承的,多一个基类也没什么。
现在我们的数据里面已经包含一把锁了,之后就是要添加加锁和解锁的动作,把它们作为InstanceLockBase类的成员函数再合适不过了:

class InstanceLockBase
... {
CRITICAL_SECTION cs;
void Lock() ... { EnterCriticalSection( & cs); }
void Unlock() ... { LeaveCriticalSection( & cs); }

} ;

看到这里可能会发现,我把Lock和Unlock函数都声明为私有了,那么如何访问这两个函数呢?是的,我们总是需要有一个地方来调用这两个函数以实现加锁和解锁的,而且它们总应该成对出现,但C++语法本身没能限制我们必须成对的调用两个函数,如果加完锁忘了解,那后果是严重的。这里有一个例外,就是C++对于构造函数和析构函数的调用是自动成对的,对了,那就把对Lock和Unlock的调用专门写在一个类的构造函数和析构函数中:

class InstanceLock
... {
InstanceLockBase * _pObj;
public :
InstanceLock(InstanceLockBase * pObj)
... {
_pObj = pObj; // 这里会保存一份指向s的指针,用于解锁
if (NULL != _pObj)
_pObj -> Lock(); // 这里加锁
}
~ InstanceLock()
... {
if (NULL != _pObj)
_pObj -> Unlock(); // 这里解锁
}
} ;

最后别忘了在类InstanceLockBase中把InstanceLock声明为友元,使得它能正确访问Lock和Unlock这两个私有函数:

class InstanceLockBase
... {
friend class InstanceLock;

} ;

好了,有了上面的基础,现在对变量s的加解锁管理变成了对InstanceLock的实例的生命周期的管理了。假如我们有一个函数ModifyS中要对s进行修改,那么只要在函数一开始就声明一个InstaceLock的实例,这样整个函数就自动对s加锁,一旦进入这个函数,其他线程就都不能获得s的锁了:

void ModifyS()
... {
InstanceLock lock ( & s); // 这里已经实现加锁了
// some operations on s
} // 一旦离开lock对象的作用域,自动解锁

如果是要对某函数中一部分代码加锁,只要用一对大括号把它们括起来再声明一个lock就可以了:


... {
InstanceLock lock ( & s);
// do something …
}

好了,就是这么简单。下面来看一个测试。
首先准备一个输出函数,对我们理解程序有帮助。它会在输出我们想输出的内容同时打出行号和时间:

void Say( char * text)
... {
static int count = 0 ;
SYSTEMTIME st;
::GetLocalTime( & st);
printf( " %03d [%02d:%02d:%02d.%03d]%s " , ++ count, st.wHour, st.wMinute, st.wSecond, st.wMilliseconds, text);
}

当然,原则上当多线程都调用这个函数时应该对其静态局部变量count进行加锁,这里就省略了。
我们声明一个非常简单的被锁的类型,并生成一个实例:

class MyClass: public InstanceLockBase
... {} ;
MyClass mc;

子线程的任务就是对这个对象加锁,然后输出一些信息:

DWORD CALLBACK ThreadProc(LPVOID param)
... {
InstanceLock il( & mc);
Say( " in sub thread, lock " );
Sleep( 2000 );
Say( " in sub thread, unlock " );
return 0 ;
}

这里会输出两条信息,一是在刚刚获得锁的时间,二是在释放锁的时候,中间通过Sleep来延迟2秒。
主线程负责开启子线程,然后也对mc加锁:

CreateThread( 0 , 0 , ThreadProc, 0 , 0 , 0 );
... {
InstanceLock il( & mc);
Say( " in main thread, lock " );
Sleep( 3000 );
Say( " in main thread, lock " );
}

运行此程序,得到的输出如下:

001 [13:43:23.781]in main thread, lock
002 [13:43:26.781]in main thread, lock
003 [13:43:26.781]in sub thread, lock
004 [13:43:28.781]in sub thread, unlock

从其输出的行号和时间可以清楚的看到两个线程间的互斥:当主线程恰好首先获得锁时,它会延迟3秒,然后释放锁,之后子线程才得以继续进行。这个例子也证明我们的类工作的很好。
总结一下,要使用InstanceLock系列类,要做的就是:
1、让被锁类从InstanceLockBase继承
2、所有要访问被锁对象的代码前面声明InstanceLock的实例,并传入被锁对象的指针。
附:完整源代码:

#pragma once
#include < windows.h >
class InstanceLock;
class InstanceLockBase
... {
friend class InstanceLock;
CRITICAL_SECTION cs;
void Lock()
... {
::EnterCriticalSection( & cs);
}
void Unlock()
... {
::LeaveCriticalSection( & cs);
}
protected :
InstanceLockBase()
... {
::InitializeCriticalSection( & cs);
}
~ InstanceLockBase()
... {
::DeleteCriticalSection( & cs);
}
} ;
class InstanceLock
... {
InstanceLockBase * _pObj;
public :
InstanceLock(InstanceLockBase * pObj)
... {
_pObj = pObj;
if (NULL != _pObj)
_pObj -> Lock();
}
~ InstanceLock()
... {
if (NULL != _pObj)
_pObj -> Unlock();
}
} ;

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

上一篇中我介绍了一种通过封闭Critical Section对象而方便的使用互斥锁的方式,文中所有的例子是两个线程对同一数据一读一写,因此需要让它们在这里互斥,不能同时访问。而在实际情况中可能会有更复杂的情况出现,就是多个线程访问同一数据,一部分是读,一部分是写。我们知道只有读-写或写-写同时进行时可能会出现问题,而读-读则可以同时进行,因为它们不会对数据进行修改,所以也有必要在C++中封装一种方便的允许读-读并发、读-写与写-写互斥的锁。要实现这种锁,使用临界区就很困难了,不如改用内核对象,这里我使用的是互斥量(Mutex)。
总体的结构与上一篇中的类似,都是写出一个对锁进行封装的基类,再写一个用于调用加、解锁函数的类,通过对第二个类的生命周期的管理实现加锁和解锁。这里涉及到两个新问题,一是加锁、解锁动作都有两种,一种是加/解读锁,一种是加/解写锁;二是为了允许读-读并发,这里只声明一个Mutex是不够的,必须要声明多个Mutex,而且有多少个Mutex就同时允许多少个读线程并发,之所以这么说,是因为我们要使用的API函数是WaitForMultipleObjects。
WaitForMultipleObjects函数的功能就是等待对象状态被设置,MSDN中对它的说明为:
Waits until one or all of the specified objects are in the signaled state or the time-out interval elapses.
这是个很好用的函数,我们可以用它来等待某个或某几个对象,并且允许设置超时时间,等待成功时与超时时返回的值是不同的。如果返回的值比WAIT_ABANDONED小则表示等待成功。“等待成功”对于不同类型的内核对象有不同的意义,例如对于进程或线程对象,等待成功就表示进程或线程执行结束了;对于互斥量对象,则表示此对象现在不被任何其他线程拥有,并且一旦等待成功,当前线程即拥有了此互斥量,其他线程则不能同时拥有,直接调用ReleaseMutex函数主动释放互斥量。
与WaitForMultipleObjects类似的还有一个函数WaitForSingleObject,它的功能比较简单,只针对单一个对象,而WaitForMultipleObjects可以同时等待多个对象,并且可以设置是否等待所有对象。
上一篇文章中用的InstanceLockBase类里面封装了一个Critical Section对象,这里则要封装一组Mutex的Handle,那么这一组是多少个呢?它应该由使用此类的程序中定义,例如可以用动态数组的方法:

//基类:
class RWLockBase //表示Read/Write Lock
...{
HANDLE* handles;
protected:
RWLockBase(int handleCount) ...{ handles = new HANDLE[handleCount]; }

};
//子类:
class MyClass: public RWLockBase
...{
MyClass(): RWLockBase(3) ...{}

};

这确实是个不错的办法,通过在子类构造函数的初始化段中调用基类构造函数并传参,使得这个动态数组得以正确初始化,不过这样看着不太爽,子类必须两次出现“RWLockBase”一词,能不能像InstanceLockBase那样只要继承了就好呢?答案是肯定的,只要用C++模板即可:

template <int maxReadCount>
class RWLockBase
...{
HANDLE handles[maxReadCount];

};

使用模板附带这么一个好处,因为模板参数是在编译期可以确定的,所以无需再用动态数组,直接在栈上分配即可。而使用模板引出一个新问题,就是相应的Lock类(RWLock)在构造时传的对象指针时的类型声明,直接写成RWLock(RWLockBase* pObj)肯定是不行的,因为必须指定模板参数,并且其值还必须与声明RWLockBase时所指定的值一致才行,从而客户端代码就必须两次指定模板参数值,不爽!解决的办法也是有一个,就是把RWLockBase变成夹层类,为它再声明一个基类,让RWLock接收的是基类指针,并把Lock、Unlock等函数放在基类中,声明为纯虚函数,实现写在夹层类中:

class _RWLockBase
...{
friend class RWLock;
protected:
virtual DWORD ReadLock(int timeout) = 0;
virtual void ReadUnlock(int handleIndex) = 0;
virtual DWORD WriteLock(int timeout) = 0;
virtual void WriteUnlock() = 0;
};

模板类RWLockBase从_RWLockBase继承,并对四个函数写出实现:

template <int maxReadCount = 3> //这里给一个缺省参数,尽量减少客户端代码量
class RWLockBase: public _RWLockBase
...{
HANDLE handles[maxReadCount];
DWORD ReadLock(int timeout) //加读锁,只要等到一个互斥量返回即可
...{
return ::WaitForMultipleObjects(maxReadCount, handles, FALSE, timeout);
}
void ReadUnlock(int handleIndex) //解读锁,释放已获得的互斥量
...
DWORD WriteLock(int timeout) //加写锁,等到所有互斥量,从而与其他所有线程互斥
...{
return ::WaitForMultipleObjects(maxReadCount, handles, TRUE, timeout);
}
void WriteUnlock() //解写锁,释放所有的互斥量
...{
for(int i = 0; i < maxReadCount; i++)
::ReleaseMutex(handles[i]);
}
protected:
WLockBase() //构造函数,初始化每个互斥量
..{
for(int i = 0; i < maxReadCount; i++)
handles[i] = ::CreateMutex(0, FALSE, 0);
}
~RWLockBase() //析构函数,销毁对象
...{
for(int i = 0; i < maxReadCount; i++)
::CloseHandle(handles[i]);
}
};

而相应的锁类也会稍复杂一些:

class RWLock
...{
bool lockSuccess; //因为有可能超时,需要保存是否等待成功
int readLockHandleIndex; //对于读锁,需要知道获得的是哪个互斥量
_RWLockBase* _pObj; //目标对象基类指针
public:
//这里通过第二个参数决定是加读锁还是写锁,第三个参数为超时的时间
RWLock(_RWLockBase* pObj, bool readLock = true, int timeout = 3000)
...{
_pObj = pObj;
lockSuccess = FALSE;
readLockHandleIndex = -1;
if(NULL == _pObj)
return;
if(readLock) //读锁
...{
DWORD retval = _pObj->ReadLock(timeout);
if(retval < WAIT_ABANDONED) //返回值小于WAIT_ABANDONED表示成功
...{ //其值减WAIT_OBJECT_0就是数组下标
readLockHandleIndex = retval - WAIT_OBJECT_0;
lockSuccess = TRUE;
}
}
else
...{
WORD retval = _pObj->WriteLock(timeout);
if(retval < WAIT_ABANDONED) //写锁时获得了所有互斥量,无需保存下标
lockSuccess = TRUE;
}
}
~RWLock()
...{
if(NULL == _pObj)
return;
if(readLockHandleIndex > -1)
_pObj->ReadUnlock(readLockHandleIndex);
else
_pObj->WriteUnlock();
}
bool IsLockSuccess() const ...{ return lockSuccess; }
};

这样一来,读/写锁的类也就完成了,使用时与InstanceLock类似:
1、被锁对象从RWLockBase<>类继承
2、需要加读锁时,声明一个RWLock实例,并指出要加的是读锁
3、需要加写锁时,声明一个RWLock实例,并指出要加的是写锁
这里还是要多说两句,虽然使用纯虚函数结合模板类,使得客户端代码量减到最少,但性能上有一些影响,因为声明了虚函数,则实例中必然存在4个字节的VPTR,调用虚函数时则要查找VTABLE,空间和时间上都有微小的牺牲。而如果不使用模板类,则没有虚函数的代价,但也有牺牲:不使用模板类则需要使用动态数组,动态数组本身需要程序运行时在堆上分配,这也需要时间;指向动态数组的指针也需要占用内存,所以空间上的开锁是一样的,时间上虽然动态分配内存需要的时间应该比虚函数的调用要慢一点,但初始化只需要一次,总体来说也是值得的。所以最终要使用哪一种,就看具体需要了。
这里也给出一个实验。这里所用的被锁类也上一篇类似,简单的从RWLockBase类继承:

class MyClass2: public RWLockBase<>
...{};
MyClass2 mc2;

看看两个线程函数:

//读线程
DWORD CALLBACK ReadThreadProc(LPVOID param)
...{
int i = (int)param;
RWLock lock(&mc2); //加读锁
if(lock.IsLockSuccess()) //如果加锁成功
{
Say("read thread %d started", i); //为了代码短一些,假设Say函数有这种能力
Sleep(1000);
Say("read thread %d ended", i);
}
else //加锁超时,则显示超时信息
Say("read thread %d timeout", i);
return 0;
}
//写线程
DWORD CALLBACK WriteThreadProc(LPVOID param)
...{
int i = (int)param;
RWLock lock(&mc2, false); //加写锁。
if(lock.IsLockSuccess())
...{
Say("write thread %d started", i);
Sleep(600);
Say("write thread %d ended", i);
}
else
Say("write thread %d timeout", i);
return 0;
}

主线程:

int i;
for(i = 0; i < 5; i++)
::CreateThread(0, 0, ReadThreadProc, (LPVOID)i, 0, 0);
for(i = 0; i < 5; i++)
::CreateThread(0, 0, WriteThreadProc, (LPVOID)i, 0, 0);

程序共开10个线程,5个读5个写。从RWLockBase类继承时我们使用了默认的模板参数,所以最多同时允许3个读线程。程序的运行结果如下:

001 [15:07:28.484]read thread 0 started
002 [15:07:28.484]read thread 1 started
003 [15:07:28.484]read thread 2 started
004 [15:07:29.484]read thread 0 ended
005 [15:07:29.484]read thread 3 started
006 [15:07:29.484]read thread 1 ended
007 [15:07:29.484]read thread 4 started
008 [15:07:29.484]read thread 2 ended
009 [15:07:30.484]read thread 3 ended
010 [15:07:30.484]read thread 4 ended
011 [15:07:30.484]write thread 0 started
012 [15:07:31.078]write thread 0 ended
013 [15:07:31.078]write thread 1 started
014 [15:07:31.484]write thread 2 timeout
015 [15:07:31.484]write thread 3 timeout
016 [15:07:31.484]write thread 4 timeout
017 [15:07:31.687]write thread 1 ended

前三行三个读线程取得读锁,之后等一秒(第4-8行),三个读线程都结束了,并且余下的两个读线程取得读锁,虽然这时剩下了一个互斥量没有使用,但因为其他的线程都请求加写锁,写锁与其他所有线程互斥,所以还不能取得写锁。再过一秒(第9-11行),后来的两个取得读锁的线程也结束了,则第一个写线程取得写锁。600毫秒之后(第12-13行)第一个写线程结束,第二个写线程开始。400毫秒之后(第14-16行)余下的三个写线程都超时了,再后第二个写线程也结束了。

给我留言

留言无头像?