现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

POCO库 Foundation::Thread模块 多线程与线程池支持

2019-07-31 07:00 工业·编程 ⁄ 共 25896字 ⁄ 字号 暂无评论

本节主要介绍Thread类和ThreadLocal机制的使用方法以及实现原理,以及对ThreadPool线程池支持的简单了解

Thread类使用方法

在C++语言中,我们通过_beginThreadex或CreateThread来创建线程(最好使用前者,关于两者区别和线程基础知识可参见《Windows核心编程》),并且提供一个原型为void MyFunc(void pParam)入口函数来完成任务。在Poco中,将入口函数抽象为一个类Runnable,该类提供void run()接口,用户需要继承至该类来实现自定义的入口函数。Poco将线程也抽象为一个类Thread,提供了start, join等方法。

一个Thread使用例子如下:

#include "Poco/Thread.h"

#include "Poco/Runnable.h"

#include <iostream>

class HelloRunnable: public Poco::Runnable

{

virtual void run()

    {

std::cout << "Hello, world!" << std::endl;

    }

};

int main(int argc, char** argv)

{

    HelloRunnable runnable;

    Poco::Thread thread;

    thread.start(runnable);//传入对象而不是对象指针

    thread.join();

return 0;

}

定义一个Thread对象,调用其start方法并传入一个Runnable对象来启动线程,使用的方法比较简单,另外,如果你的线程的入口函数在另一个已定义好的类中,那么Poco提供了一个适配器来使线程能够从你指定的入口启动,并且无需修改已有的类

#include "Poco/Thread.h"

#include "Poco/RunnableAdapter.h"

#include <iostream>

class Greeter

{

public:

void greet()

   {

std::cout << "Hello, world!" << std::endl;

   }

};

int main(int argc, char** argv)

{

    Greeter greeter;

    Poco::RunnableAdapter<Greeter> runnable(greeter, &Greeter::greet);

    Poco::Thread thread;

    thread.start(runnable);

    thread.join();//等待该线程技术

return 0;

}

看完了其使用方法之后,我们来查看其内部实现。

Thread和Runnable如何工作

先看看thread.start是怎么启动一个新线程的:

在Poco-1.4.6/Foundation/src/Thread_WIN32中找到start的实现startImpl:

void ThreadImpl::startImpl(Runnable& target)

{

if (isRunningImpl())

throw SystemException("thread already running");

_pRunnableTarget = ⌖ //记录入口

createImpl(runnableEntry, this);

}

该函数先判断线程是否正在运行,然后将Runnable对象指针存入成员_pRunnableTarget中,之后调用createImpl函数,并传入runnableEntry函数地址和this指针

void ThreadImpl::createImpl(Entry ent, void* pData)

{

#if defined(_DLL)

_thread = CreateThread(NULL, _stackSize, ent, pData, 0, &_threadId);

#else

unsigned threadId;

_thread = (HANDLE) _beginthreadex(NULL, _stackSize, ent, this, 0, &threadId);

_threadId = static_cast<DWORD>(threadId);

#endif

if (!_thread)

throw SystemException("cannot create thread");

if (_prio != PRIO_NORMAL_IMPL && !SetThreadPriority(_thread, _prio))

throw SystemException("cannot set thread priority");

}

其中Entry ent参数也就是runnableEntry函数代码如下:

#if defined(_DLL)

DWORD WINAPI ThreadImpl::runnableEntry(LPVOID pThread)

#else

unsigned __stdcall ThreadImpl::runnableEntry(void* pThread)

#endif

{

_currentThreadHolder.set(reinterpret_cast<ThreadImpl*>(pThread));

#if defined(_DEBUG) && defined(POCO_WIN32_DEBUGGER_THREAD_NAMES)

setThreadName(-1, reinterpret_cast<Thread*>(pThread)->getName().c_str());

#endif

try

{

reinterpret_cast<ThreadImpl*>(pThread)->_pRunnableTarget->run();

}

catch (Exception& exc)

{

ErrorHandler::handle(exc);

}

catch (std::exception& exc)

{

ErrorHandler::handle(exc);

}

catch (...)

{

ErrorHandler::handle();

}

return 0;

}

可以看出,createImpl负责创建线程,并且把入口函数runnableEntry作为线程入口,将this指针作为参数。在runnableEntry中,首先将pThread也就是代表该线程的Threal对象地址放入_currentThreadHolder中,static CurrentThreadHolder _currentThreadHolder;是一个静态数据成员,它的存在是为了方便程序在任何环境下通过Thread::current来获取当前运行线程所属的Thread对象指针。CurrentThreadHolder是ThreadImpl的一个内嵌类,它通过线程的TLS机制将线程的Thread指针放入TLS数组的某个槽中(_slot),并提供存取(set)和获取(get)方法,源码如下:

class CurrentThreadHolder

{

public:

CurrentThreadHolder(): _slot(TlsAlloc())

{

if (_slot == TLS_OUT_OF_INDEXES)

throw SystemException("cannot allocate thread context key");

}

~CurrentThreadHolder()

{

TlsFree(_slot);

}

ThreadImpl* get() const

{

return reinterpret_cast<ThreadImpl*>(TlsGetValue(_slot));

}

void set(ThreadImpl* pThread)

{

TlsSetValue(_slot, pThread);

}

private:

DWORD _slot;

};

runnableEntry在通过_currentThreadHolder存取了Thread指针之后,便开始调用用户在Runnable类中定义的run函数。

ThreadImpl类还提供了一系列线程相关的方法:

void ThreadImpl::joinImpl()

{

if (!_thread) return;

switch (WaitForSingleObject(_thread, INFINITE))

{

case WAIT_OBJECT_0:

threadCleanup();

return;

default:

throw SystemException("cannot join thread");

}

}

bool ThreadImpl::joinImpl(long milliseconds)

{

if (!_thread) return true;

switch (WaitForSingleObject(_thread, milliseconds + 1))

{

case WAIT_TIMEOUT:

return false;

case WAIT_OBJECT_0:

threadCleanup();

return true;

default:

throw SystemException("cannot join thread");

}

}

bool ThreadImpl::isRunningImpl() const

{

if (_thread)

{

DWORD ec = 0;

return GetExitCodeThread(_thread, &ec) && ec == STILL_ACTIVE;

}

return false;

}

void ThreadImpl::threadCleanup()

{

if (!_thread) return;

if (CloseHandle(_thread)) _thread = 0;

}

ThreadImpl* ThreadImpl::currentImpl()

{

return _currentThreadHolder.get();

}

ThreadImpl::TIDImpl ThreadImpl::currentTidImpl()

{

return GetCurrentThreadId();

}

RunnableAdapter适配器:

下面我们再看看RunnableAdapter是如何运用适配器模式的,在Poco-1.4.6/Foundation/Include/RunnableAdaper.h中找到RunnableAdaper类的实现:

template <class C>

class RunnableAdapter: public Runnable

/// This adapter simplifies using ordinary methods as

/// targets for threads.

/// Usage:

///    RunnableAdapter<MyClass> ra(myObject, &MyObject::doSomething));

///    Thread thr;

///    thr.Start(ra);

///

/// For using a freestanding or static member function as a thread

/// target, please see the ThreadTarget class.

{

public:

typedef void (C::*Callback)();

RunnableAdapter(C& object, Callback method): _pObject(&object), _method(method)

{

}

RunnableAdapter(const RunnableAdapter& ra): _pObject(ra._pObject), _method(ra._method)

{

}

~RunnableAdapter()

{

}

RunnableAdapter& operator = (const RunnableAdapter& ra)

{

_pObject = ra._pObject;

_method  = ra._method;

return *this;

}

void run()

{

(_pObject->*_method)();

}

private:

RunnableAdapter();

C*       _pObject;

Callback _method;

};

可以看出,这里是一个经典的对象适配器模式的运用,关于适配器模式可见参考文章:http://www.cnblogs.com/houleixx/archive/2008/03/04/1090214.html

Thread的另一个接口:直接传入通过函数和参数

Thread::start除了接收Runnable对象之外,还可以传入函数和参数,向_beginThreadex和CreateThread那样,start原型如下:

typedef void (*Callable)(void*);

void start(Callable target, void* pData = 0);

使用范例:

#include <iostream>

#include "Poco/Thread.h"

#include "Poco/ThreadLocal.h"

#include "Poco/Runnable.h"

using namespace std;

using namespace Poco;

void sayHello(void* name)

{

cout<<"Hello "<<(char*)name<<endl;

}

void main()

{

static char* name = "DJWu";

Thread thr;

thr.start(sayHello, name);

thr.join();

return ;

}

现在我们来看看这种情况下Thread::start是如何工作的:

在Foundation/src/Thread_WIN32.cpp中找到startImpl的另一个重载源码:

void ThreadImpl::startImpl(Callable target, void* pData)

{

if (isRunningImpl())

throw SystemException("thread already running");

threadCleanup();

_callbackTarget.callback = target;

_callbackTarget.pData = pData;

createImpl(callableEntry, this);

}

    startImpl将用户定义的参数和入口函数放入一个成员结构体_callbackTarget中,然后调用createImpl,由于这里传入的callableEntry和前面Runnable版本的startImpl传入的runnableEntry函数原型是一致的(定义在Foundation/Include/Thread_WIN32.h中):

#if defined(_DLL)

static DWORD WINAPI runnableEntry(LPVOID pThread);

#else

static unsigned __stdcall runnableEntry(void* pThread);

#endif

#if defined(_DLL)

static DWORD WINAPI callableEntry(LPVOID pThread);

#else

static unsigned __stdcall callableEntry(void* pThread);

#endif

它们的原型与Entry一致:

#if defined(_DLL)

typedef DWORD (WINAPI *Entry)(LPVOID);

#else

typedef unsigned (__stdcall *Entry)(void*);

#endif

因此它们调用的是同一个createImpl(createImpl也没有重载),这里再次将createImpl贴出来:

void ThreadImpl::createImpl(Entry ent, void* pData)

{

#if defined(_DLL)

_thread = CreateThread(NULL, _stackSize, ent, pData, 0, &_threadId);

#else

unsigned threadId;

_thread = (HANDLE) _beginthreadex(NULL, _stackSize, ent, this, 0, &threadId);

_threadId = static_cast<DWORD>(threadId);

#endif

if (!_thread)

throw SystemException("cannot create thread");

if (_prio != PRIO_NORMAL_IMPL && !SetThreadPriority(_thread, _prio))

throw SystemException("cannot set thread priority");

}

此时线程的真正入口callableEntry如下:

#if defined(_DLL)

DWORD WINAPI ThreadImpl::callableEntry(LPVOID pThread)

#else

unsigned __stdcall ThreadImpl::callableEntry(void* pThread)

#endif

{

_currentThreadHolder.set(reinterpret_cast<ThreadImpl*>(pThread));

#if defined(_DEBUG) && defined(POCO_WIN32_DEBUGGER_THREAD_NAMES)

setThreadName(-1, reinterpret_cast<Thread*>(pThread)->getName().c_str());

#endif

try

{

ThreadImpl* pTI = reinterpret_cast<ThreadImpl*>(pThread);

pTI->_callbackTarget.callback(pTI->_callbackTarget.pData);

}

catch (Exception& exc)

{

ErrorHandler::handle(exc);

}

catch (std::exception& exc)

{

ErrorHandler::handle(exc);

}

catch (...)

{

ErrorHandler::handle();

}

return 0;

}

这里面和runnableEntry做相似的工作:先保存该线程对应的Thread对象指针,再调用用户指定的入口,前面用用户指定的对象调用run函数,这里用_callbackTarget中的函数地址和参数启动函数。

综合这两种启动线程的方式,它们的入口并不直接是用户指定的入口,而是runnableEntry或者callbackEntry,它们做了一些额外工作:

1.保存当前线程对应的Thread对象指针(通过TLS机制)

2.如果是在调试状态,则可以给线程设置名字(可通过Thread::setName指定)

3.为线程运行设置异常帧

线程本地存储:ThreadLocal类

    ThreadLocal类为开发者提供了更为简洁的TLS机制使用方法,TLS机制用来保存这样一些变量:它们在不同的线程里有不同的值,并且各自维护,线程不能访问其他线程中的这些变量。

关于TLS机制可参见《Windows核心编程》和这篇文章:http://www.cnblogs.com/stli/archive/2010/11/03/1867852.html

   ThreadLocal使用方法:

#include "Poco/Thread.h"

#include "Poco/Runnable.h"

#include "Poco/ThreadLocal.h"

#include <iostream>

class Counter: public Poco::Runnable

{

void run()

   {

static Poco::ThreadLocal<int> tls;

for (*tls = 0; *tls < 10; ++(*tls))

        {

std::cout << *tls << std::endl;

        }

    }

};

int main(int argc, char** argv)

{

    Counter counter;

    Poco::Thread t1;

    Poco::Thread t2;

    t1.start(counter);//这两句官方文档上有错,文档上是t1.start(); t2.start();

    t2.start(counter);

    t1.join();

    t2.join();

return 0;

}

使用ThreadLocal模板类可以保存任何变量(只需提供默认构造函数),并且通过*和->来进行很方便的存取。使用方法一目了然,避开了相对繁琐的TlsAlloc,TlsSetValue,TlsGetValue,其实ThreadLocal内部也并没有使用线程的TLS机制。来看看其内部实现。在Foundation/Include/Poco/ThreadLocal.h和Foundation/src/ThreadLocal.cpp中,我们找到四个相关类,为了解释方便,我将ThreadLocal.cpp中比较重要的函数实现一起放在了ThreadLocal.h中:

class Foundation_API TLSAbstractSlot  //该类用于抽象TLSSlot模板类,并没有实际接口

/// This is the base class for all objects

/// that the ThreadLocalStorage class manages.

{

public:

TLSAbstractSlot();

virtual ~TLSAbstractSlot();

};

template <class C>

class TLSSlot: public TLSAbstractSlot  //该类实际代表了对ThreadLocal对象所保存的值(模板参数也由ThreadLocal提供) 并且给出了值的存取过程(注意value()函数返回的是引用)

/// The Slot template wraps another class

/// so that it can be stored in a ThreadLocalStorage

/// object. This class is used internally, and you

/// must not create instances of it yourself.

{

public:

TLSSlot():_value(){}

~TLSSlot(){}

C& value(){return _value;}

private:

TLSSlot(const TLSSlot&);

TLSSlot& operator = (const TLSSlot&);

C _value;

};

class Foundation_API ThreadLocalStorage

    //该类维系一个map<ThreadLocal<C>*, TLSSlot<C>*>这是实现ThreadLocal的关键

    //ThreadLocal类通过传入this指针来获取自身所代表的值(一个ThreadLocal对象对应代表一个值)

/// This class manages the local storage for each thread.

/// Never use this class directly, always use the

/// ThreadLocal template for managing thread local storage.

{

public:

ThreadLocalStorage(){}

/// Creates the TLS.

~ThreadLocalStorage()

/// Deletes the TLS.

{

for (TLSMap::iterator it = _map.begin(); it != _map.end(); ++it)

{

delete it->second;

}

}

TLSAbstractSlot*& get(const void* key)

//通过传入的ThreadLocal<C>*指针在_map中查找对应的TLSSlot<C>指针,注意在ThreadLocal对象定义时

//并不会立即将ThreadLocal对象和一个TLSSlot关联起来,而是在第一次对其使用*或者->获取其值时,

//也就是第一次调用本函数时,如果在_map中没有找到其对应值,才将ThreadLocal指针和一个NULL指针插入_map

//然后返回NULL。由于返回的指针引用,因此在之外对返回值作的修改也会修改_map中的值

/// Returns the slot for the given key.

{

TLSMap::iterator it = _map.find(key);

if (it == _map.end())//没找到 插入并返回空指针

return _map.insert(TLSMap::value_type(key, reinterpret_cast<Poco::TLSAbstractSlot*>(0))).first->second;

else

return it->second;

}

static ThreadLocalStorage& current()

/// Returns the TLS object for the current thread

/// (which may also be the main thread).

{

Thread* pThread = Thread::current();

if (pThread)

{

return pThread->tls();

//附Thread::tls()代码:

//ThreadLocalStorage& Thread::tls()

//{

// if (!_pTLS)

// _pTLS = new ThreadLocalStorage;

// return *_pTLS;

//}

}

else

{

return *sh.get();

//static SingletonHolder<ThreadLocalStorage> sh;是一个全局静态变量

//SingletonHolder是一个单例模式容器 如果pThread为NULL,则说明当前线程是主线程

//sh是为主线程准备的ThreadLocalStorage

}

}

static void clear()

/// Clears the current thread's TLS object.

/// Does nothing in the main thread.

{

Thread* pThread = Thread::current();

if (pThread)

pThread->clearTLS();

//附Thread::clearTls()代码:

//void Thread::clearTLS()

//{

// if (_pTLS)

// {

// delete _pTLS;

// _pTLS = 0;

// }

//}

}

private:

typedef std::map<const void*, TLSAbstractSlot*> TLSMap;

TLSMap _map;

friend class Thread;

};

template <class C>

class ThreadLocal //ThreadLocal完成对自身所代表的值的一层封装 值的获取在ThreadLocalStorage中完成

/// This template is used to declare type safe thread

/// local variables. It can basically be used like

/// a smart pointer class with the special feature

/// that it references a different object

/// in every thread. The underlying object will

/// be created when it is referenced for the first

/// time.

/// See the NestedDiagnosticContext class for an

/// example how to use this template.

/// Every thread only has access to its own

/// thread local data. There is no way for a thread

/// to access another thread's local data.

{

typedef TLSSlot<C> Slot;

public:

ThreadLocal()

{

}

~ThreadLocal()

{

}

C* operator -> ()

{

return &get();

}

C& operator * ()

/// "Dereferences" the smart pointer and returns a reference

/// to the underlying data object. The reference can be used

/// to modify the object.

{

return get();

}

C& get()

/// Returns a reference to the underlying data object.

/// The reference can be used to modify the object.

{

//在当前线程的ThreadLocalStorage类中通过this指针在map中查找其代表的值

//注意ThreadLocalStorage::get(this)返回的是TLSSlot<C>*指针的引用

//因此对返回指针引用p的修改会直接影响到ThreadLocalStorage::_map中的值

TLSAbstractSlot*& p = ThreadLocalStorage::current().get(this);

if (!p) p = new Slot;

return static_cast<Slot*>(p)->value();

}

private:

ThreadLocal(const ThreadLocal&);

ThreadLocal& operator = (const ThreadLocal&);

};

看起来这四个类以及Thread类之间的交互有些麻烦,但是实际这主要是为了明确各个类的职责:

对于Thread类,它维护一个ThreadLocalStorage* _pTLS指针,负责它本身的TLS类的分配(tls())和释放(clearTls())

对于ThreadLocalStorage类 它是整个Thread TLS机制的核心,它从友元类Thread获取当前运行线程的_pTLS指针,并且在该ThreadLocalStorage里寻找传入的ThreadLocal指针所代表的值,如果找不到,则插入一个pair,将该second设为NULL, 并且返回TLSAbstractSlot*指针的引用。

对于TLSAbstractSlot类,它的主要功能就是抽象TLSSlot<C>模板类,这样ThreadLocalStorage可以返回统一接口,而不用再成为模板类(如果这样,那么Thread类维护_pTLS也会比较困难,因为模板类实例化需要提供模板参数)。

       TLSSlot类代表ThreadLocal代表的值,并负责该值的读取(只有value()方法而没有setValue()方法),注意在使用ThreadLocal时,需要先声明再赋值,而不是直接初始化,因为如果ThreadLocal<int> a = 3;  a实际上是ThreadLocal对象,而不是int的引用。正确使用应该是ThreadLocal<int> a; *a = 3;这也是之前说使用ThreadLocal作为TLS值的类要求必须要有默认构造函数的原因。

还有注意的是,在整个类之间的传递过程中,基本都是返回的指针引用,这样才能一处修改,影响到其他组件的同步修改。

      Poco的ThreadLocal机制并没有使用线程的TLS机制,而是将TLS数据放在了Thread类中(确切说是其维护的_pTLS指针中,对于主线程,其并没有对应Thread类,因此为其定制了一个全局静态单例的ThreadLocalStorage对象)。

ThreadPool线程池支持

POCO为我们提供了线程池的接口,关于线程池的优缺点和适用情形这里不再讨论,网上也有很多各式的线程池实现,POCO的线程池自然是基于前面介绍的多线程结构的。

简单地说,POCO线程池主要有两个类,PooledThread和ThreadPool,前者是线程池中的线程,负责执行线程池分配下来的任务,它基于Thread和Runnable机制。后者是线程池类,它负责对线程池中的各个线程进行维护(创建,分配,回收,清除等)。

先看看PooledThread的主要接口:

文件位置:poco-1.4.6/Foundation/src/ThreadPool.cpp

class PooledThread: public Runnable

{

public:

PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);

~PooledThread();

void start();//线程处于就绪(空闲)状态,当入口被设定后(通过下面两个start),即可开始任务(由_targetReady信号量控制)。

void start(Thread::Priority priority, Runnable& target);//为线程设定优先级和入口

void start(Thread::Priority priority, Runnable& target, const std::string& name);//为线程设定优先级,入口和名字

bool idle();//返回是否是空闲线程

int idleTime();//空闲时间

void join();//等待结束

void activate();//激活线程 将线程由就绪(空闲)改为忙碌(_idle=false)

void release();//销毁自身

void run();//自定义入口 在start()中调用 它等待_targetReady信号量 并执行真正的线程入口_pTarget->run();

private:

volatile bool        _idle;//线程是否空闲

volatile std::time_t _idleTime;//线程本次空闲开始时刻

Runnable*            _pTarget;//线程入口

std::string          _name;//线程名字(可选)

Thread               _thread;//线程对象

Event                _targetReady;//任务是否准备好 即_pTarget是否有效

Event                _targetCompleted;//任务是否执行完毕 即_pTarget->run()是否执行完成

Event                _started;//线程是否已经开始

FastMutex            _mutex;//提供对_pTarget的互斥访问

};

下面是PooledThread的一些主要函数实现:

void PooledThread::start()

{

_thread.start(*this);

_started.wait();

}

void PooledThread::start(Thread::Priority priority, Runnable& target)

{

FastMutex::ScopedLock lock(_mutex);

poco_assert (_pTarget == 0);

_pTarget = &target;

_thread.setPriority(priority);

_targetReady.set();

}

void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name)

{

FastMutex::ScopedLock lock(_mutex);

std::string fullName(name);

if (name.empty())

{

fullName = _name;

}

else

{

fullName.append(" (");

fullName.append(_name);

fullName.append(")");

}

_thread.setName(fullName);

_thread.setPriority(priority);

poco_assert (_pTarget == 0);

_pTarget = &target;

_targetReady.set();

}

inline bool PooledThread::idle()

{

return _idle;

}

int PooledThread::idleTime()

{

FastMutex::ScopedLock lock(_mutex);

#if defined(_WIN32_WCE)

return (int) (wceex_time(NULL) - _idleTime);

#else

return (int) (time(NULL) - _idleTime);

#endif

}

void PooledThread::join()

{

_mutex.lock();

Runnable* pTarget = _pTarget;

_mutex.unlock();

if (pTarget)

_targetCompleted.wait();//等待本次任务结束

}

void PooledThread::activate()

{

FastMutex::ScopedLock lock(_mutex);

poco_assert (_idle);

_idle = false;//忙碌状态

_targetCompleted.reset();//_targetCompeleted信号量无效 等待任务分配

}

void PooledThread::release()

{

const long JOIN_TIMEOUT = 10000;

_mutex.lock();

_pTarget = 0;

_mutex.unlock();

_targetReady.set();//_targetReady信号量有效 而_pTarget=0; 此时在pooledThread:run()中将跳出无线循环 结束自身

if (_thread.tryJoin(JOIN_TIMEOUT))

{

delete this;

}

}

void PooledThread::run()

{

_started.set();

for (;;)//不断等待并执行分配的任务 通过_targetReady判断是否有新的任务

{

_targetReady.wait();

_mutex.lock();

if (_pTarget) //当_pTarget=0;将跳出无限循环 即结束自身

{

_mutex.unlock();

try

{

_pTarget->run();

}

catch (Exception& exc)

{

ErrorHandler::handle(exc);

}

catch (std::exception& exc)

{

ErrorHandler::handle(exc);

}

catch (...)

{

ErrorHandler::handle();

}

FastMutex::ScopedLock lock(_mutex);

_pTarget  = 0;

#if defined(_WIN32_WCE)

_idleTime = wceex_time(NULL);

#else

_idleTime = time(NULL);

#endif

_idle     = true;//执行完成后,重新设为空闲状态

_targetCompleted.set();

ThreadLocalStorage::clear();

_thread.setName(_name);

_thread.setPriority(Thread::PRIO_NORMAL);

}

else

{

_mutex.unlock();

break;

}

}

}

      PooledThread通过维护一个Thread对象和一些信号量控制来完成对Thread对象的复用,PooledThread类从Runnable派生,这样就可以通过定义run()方法来反复执行任务,而实际上每次执行的任务是定义在成员Runnable* _pTarget中。

而ThreadPool就更为简单了,它负责任务的分配,线程的管理。接口如下:

文件位置:poco-1.4.6/Foundation/Include/poco/ThreadPool.h

class Foundation_API ThreadPool

/// A thread pool always keeps a number of threads running, ready

/// to accept work.

/// Creating and starting a threads can impose a significant runtime

/// overhead to an application. A thread pool helps to improve

/// the performance of an application by reducing the number

/// of threads that have to be created (and destroyed again).

/// Threads in a thread pool are re-used once they become

/// available again.

/// The thread pool always keeps a minimum number of threads

/// running. If the demans for threads increases, additional

/// threads are created. Once the demand for threads sinks

/// again, no-longer used threads are stopped and removed

/// from the pool.

{

public:

ThreadPool(int minCapacity = 2,

int maxCapacity = 16,

int idleTime = 60,

int stackSize = POCO_THREAD_STACK_SIZE);

/// Creates a thread pool with minCapacity threads.

/// If required, up to maxCapacity threads are created

/// a NoThreadAvailableException exception is thrown.

/// If a thread is running idle for more than idleTime seconds,

/// and more than minCapacity threads are running, the thread

/// is killed. Threads are created with given stack size.

ThreadPool(const std::string& name,

int minCapacity = 2,

int maxCapacity = 16,

int idleTime = 60,

int stackSize = POCO_THREAD_STACK_SIZE);

/// Creates a thread pool with the given name and minCapacity threads.

/// If required, up to maxCapacity threads are created

/// a NoThreadAvailableException exception is thrown.

/// If a thread is running idle for more than idleTime seconds,

/// and more than minCapacity threads are running, the thread

/// is killed. Threads are created with given stack size.

~ThreadPool();

/// Currently running threads will remain active

/// until they complete.

void addCapacity(int n);

/// Increases (or decreases, if n is negative)

/// the maximum number of threads.

int capacity() const;

/// Returns the maximum capacity of threads.

void setStackSize(int stackSize);

/// Sets the stack size for threads.

/// New stack size applies only for newly created threads.

int getStackSize() const;

/// Returns the stack size used to create new threads.

int used() const;

/// Returns the number of currently used threads.

int allocated() const;

/// Returns the number of currently allocated threads.

int available() const;

/// Returns the number available threads.

void start(Runnable& target);

/// Obtains a thread and starts the target.

/// Throws a NoThreadAvailableException if no more

/// threads are available.

void start(Runnable& target, const std::string& name);

/// Obtains a thread and starts the target.

/// Assigns the given name to the thread.

/// Throws a NoThreadAvailableException if no more

/// threads are available.

void startWithPriority(Thread::Priority priority, Runnable& target);

/// Obtains a thread, adjusts the thread's priority, and starts the target.

/// Throws a NoThreadAvailableException if no more

/// threads are available.

void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name);

/// Obtains a thread, adjusts the thread's priority, and starts the target.

/// Assigns the given name to the thread.

/// Throws a NoThreadAvailableException if no more

/// threads are available.

void stopAll();

/// Stops all running threads and waits for their completion.

///

/// Will also delete all thread objects.

/// If used, this method should be the last action before

/// the thread pool is deleted.

///

/// Note: If a thread fails to stop within 10 seconds

/// (due to a programming error, for example), the

/// underlying thread object will not be deleted and

/// this method will return anyway. This allows for a

/// more or less graceful shutdown in case of a misbehaving

/// thread.

void joinAll();

/// Waits for all threads to complete.

///

/// Note that this will not actually join() the underlying

/// thread, but rather wait for the thread's runnables

/// to finish.

void collect();

/// Stops and removes no longer used threads from the

/// thread pool. Can be called at various times in an

/// application's life time to help the thread pool

/// manage its threads. Calling this method is optional,

/// as the thread pool is also implicitly managed in

/// calls to start(), addCapacity() and joinAll().

const std::string& name() const;

/// Returns the name of the thread pool,

/// or an empty string if no name has been

/// specified in the constructor.

static ThreadPool& defaultPool();

/// Returns a reference to the default

/// thread pool.

protected:

PooledThread* getThread();//获取线程池中的一个空闲线程

PooledThread* createThread();//创建线程

void housekeep();//清理线程,移除多余的线程

private:

ThreadPool(const ThreadPool& pool);

ThreadPool& operator = (const ThreadPool& pool);

typedef std::vector<PooledThread*> ThreadVec;

std::string _name; //线程池名字

int _minCapacity;  //线程池最小线程容量

int _maxCapacity;  //线程池最大线程容量

int _idleTime;     //线程空闲时间(线程池中空闲时间超过_idleTime的线程可能被移除线程池)

int _serial;

int _age;

int _stackSize;    //线程池中线程的栈大小

ThreadVec _threads;//线程对象数组

mutable FastMutex _mutex;

};

在有新的任务分配时,ThreadPool通过getThread得到(或创建)一个可用的空闲线程对象PooledThread,并调用PooledThread的对应启动函数开始任务。如果此时线程池内的线程都在忙碌中且线程数达到最大容量,将抛出一个NoThreadAvailableException()异常。用户可设置线程池的名字,最小容量,最大容量,并可以手动地清理线程池中的多余空闲线程(houseKeep函数)。ThreadPool的主要函数实现如下:

ThreadPool::ThreadPool(int minCapacity,

int maxCapacity,

int idleTime,

int stackSize):

_minCapacity(minCapacity),

_maxCapacity(maxCapacity),

_idleTime(idleTime),

_serial(0),

_age(0),

_stackSize(stackSize)

{

poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);

for (int i = 0; i < _minCapacity; i++)

{

PooledThread* pThread = createThread();

_threads.push_back(pThread);

pThread->start();//线程处于就绪(空闲)状态 其实是在等待Thread->_targetReady信号量

}

}

ThreadPool::ThreadPool(const std::string& name,

int minCapacity,

int maxCapacity,

int idleTime,

int stackSize):

_name(name),

_minCapacity(minCapacity),

_maxCapacity(maxCapacity),

_idleTime(idleTime),

_serial(0),

_age(0),

_stackSize(stackSize)

{

poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);

for (int i = 0; i < _minCapacity; i++)

{

PooledThread* pThread = createThread();

_threads.push_back(pThread);

pThread->start();

}

}

ThreadPool::~ThreadPool()

{

stopAll();

}

void ThreadPool::addCapacity(int n)

{

FastMutex::ScopedLock lock(_mutex);

poco_assert (_maxCapacity + n >= _minCapacity);

_maxCapacity += n;

housekeep();

}

int ThreadPool::capacity() const

{

FastMutex::ScopedLock lock(_mutex);

return _maxCapacity;

}

int ThreadPool::available() const

{

FastMutex::ScopedLock lock(_mutex);

int count = 0;

for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)

{

if ((*it)->idle()) ++count;

}

return (int) (count + _maxCapacity - _threads.size());

}

int ThreadPool::used() const

{

FastMutex::ScopedLock lock(_mutex);

int count = 0;

for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)

{

if (!(*it)->idle()) ++count;

}

return count;

}

int ThreadPool::allocated() const

{

FastMutex::ScopedLock lock(_mutex);

return int(_threads.size());

}

void ThreadPool::start(Runnable& target)

{

getThread()->start(Thread::PRIO_NORMAL, target);

}

void ThreadPool::start(Runnable& target, const std::string& name)

{

getThread()->start(Thread::PRIO_NORMAL, target, name);

}

void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)

{

getThread()->start(priority, target);

}

void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)

{

getThread()->start(priority, target, name);

}

void ThreadPool::stopAll()

{

FastMutex::ScopedLock lock(_mutex);

for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)

{

(*it)->release();

}

_threads.clear();

}

void ThreadPool::joinAll()

{

FastMutex::ScopedLock lock(_mutex);

for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)

{

(*it)->join();

}

housekeep();//清理线程池

}

void ThreadPool::collect()

{

FastMutex::ScopedLock lock(_mutex);

housekeep();

}

void ThreadPool::housekeep()

{

_age = 0;

if (_threads.size() <= _minCapacity)

return;

ThreadVec idleThreads;

ThreadVec expiredThreads;

ThreadVec activeThreads;

idleThreads.reserve(_threads.size());

activeThreads.reserve(_threads.size());

//将线程池中的线程分为三类:正在运行的 空闲的(空闲时间小于_idleTime) 过期的(空闲时间大于_idleTime)

for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)

{

if ((*it)->idle())

{

if ((*it)->idleTime() < _idleTime)

idleThreads.push_back(*it);

else

expiredThreads.push_back(*it);

}

else activeThreads.push_back(*it);

}

int n = (int) activeThreads.size();

int limit = (int) idleThreads.size() + n;

if (limit < _minCapacity) limit = _minCapacity;//保证线程池中的线程数最少为_minCapacity

idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());

_threads.clear();//清除线程数组(此时线程对象只是被转移,因此不会影响到正在运行的线程)

for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it)

{ //如果忙碌的线程数n小于_minCapacity 那么再添加_minCapacity-n个空闲或过期线程到线程数组

if (n < limit)

{

_threads.push_back(*it);

++n;

}

else (*it)->release();//清除多余的空闲或过期线程

}

_threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end());

}

PooledThread* ThreadPool::getThread()

{

FastMutex::ScopedLock lock(_mutex);

if (++_age == 32)

housekeep();

PooledThread* pThread = 0;

for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)

{//尝试寻找空闲线程

if ((*it)->idle()) pThread = *it;

}

if (!pThread)

{//如果没有空闲线程

if (_threads.size() < _maxCapacity)

{//还有足够容量 则创建一个新线程

             pThread = createThread();

try

             {

                 pThread->start();

                 _threads.push_back(pThread);

            }

catch (...)

             {

delete pThread;

throw;

             }

}

else throw NoThreadAvailableException();//容量不足 抛出异常

}

pThread->activate();//激活线程(将线程状态由空闲改为忙碌 并重设_targetCompelete信号量)

return pThread;

}

PooledThread* ThreadPool::createThread()

{

std::ostringstream name;

name << _name << "[#" << ++_serial << "]";

return new PooledThread(name.str(), _stackSize);

}

最后,POCO用单例模式提供了一个默认的线程池:

class ThreadPoolSingletonHolder

{

public:

ThreadPoolSingletonHolder()

{

_pPool = 0;

}

~ThreadPoolSingletonHolder()

{

delete _pPool;

}

ThreadPool* pool()

{

FastMutex::ScopedLock lock(_mutex);

if (!_pPool)

{

_pPool = new ThreadPool("default");

if (POCO_THREAD_STACK_SIZE > 0)

_pPool->setStackSize(POCO_THREAD_STACK_SIZE);

}

return _pPool;

}

private:

ThreadPool* _pPool;

FastMutex   _mutex;

};

namespace

{

static ThreadPoolSingletonHolder sh;

}

ThreadPool& ThreadPool::defaultPool()

{

return *sh.pool();

}

参考文档:

poco官方使用文档:http://pocoproject.org/docs/

poco Thread模块官方介绍:http://pocoproject.org/slides/130-Threads.pdf

给我留言

留言无头像?