现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

evpp网络库代码分析(二)

2020-04-27 15:35 工业·编程 ⁄ 共 3868字 ⁄ 字号 暂无评论

下图是盗用自《Linux多线程服务端编程,使用muduo C++网络库》一书6.6.2章节(以及下面的时序图也是盗用该书的图)。该图列举出大部分常用的网络编程模型,当然了,这里并没有列出Boost.Asio的proactor模式。

2020080218105936

其中表中的“互通”是指多个客户端(连接)间是否能方便地交换数据,如chat聊天程序;UNP是指经典的《Unix网络编程卷一:套接字联网API》一书章节。而我们的evpp库实际上是用到了“方案9”,方案9的时序图如下:

20200802181530948

可以看出,每一个线程有一个EventLoop处理事件。这种方案是典型的“one loop per thread”流程,有一个“主EventLoop”负责accept连接,然后把连接通过round-robin(轮询调度)挂到底下的多个“子EventLoop”中(EventLoopThreadPool),每个连接都是由一个“子EventLoop”完成的,能保证请求的顺序性,也可以充分利用CPU,防止出现一个reactor的处理能力饱和,而且EventLoop线程池线程数量固定,不会因为连接数过多到达临界点(线程太多导致操作系统线程调度不过来)而性能下降!

相关阅读

    ----evpp网络库代码分析(一)

另外还有一种网络编程模型比较常用,就是“方案8”,这个模型在muduo中有现成方案,而在evpp中需要自己实现,时序图如下:

20200802183510869

这种方案,只有一个EventLoop,但它把计算密集的部分(decode、compute、encode)分派到一个线程池中处理,处理完后再返回到EventLoop中,这样即使decode、compute、encode这三部分是阻塞的也不影响并发连接,正因为是异步处理,导致打乱了返回的顺序性,即同为连接1(同一个连接),先后下发请求1(如Conn 1 readaable1)和请求2(如Conn 1 readaable2),有可能请求2先于请求1返回数据(响应)。如果处理的事情没有优先级之分或者计算密集型(大部分时间都是处于计算中)可以使用这种方案,如果有优先级之分,应该使用方案9,方案9算是一个各方面都比较均衡的网络编程模式,evpp库就优先使用这种模式!

    evpp中比较重要的基础类有

EventLoop

EventWatcher

Buffer等

    EventLoop相关的类包含EventLoop、EventLoopThread和EventLoopThreadPool等类,它们提供事件循环、事件分发、为reactor网络编程模式提供一个基础框架。也提供了定时器,以及RunInLoop等接口,RunInLoop可在多线程环境下被调用,在本eventloop的IO线程内执行某个用户任务回调。我们提倡one loop per thread,顾名思义每个线程只能有一个EventLoop对象,因此EventLoop的构造函数会检查当前线程是否已经创建了其他EventLoop对象,遇到错误就终止程序。

    EventWatcher相关的类包含PipeEventWatcher、TimerEventWatcher和SignalEventWatcher能够注册管道、超时以及信号事件,底层是libevent2库。

    Buffer类则是类似libevent2中的evbuffer,供应用层读写,底下的socket读写操作也会频繁使用它。

我们可以先阅读代码,证明evpp库是上述“方案9”的具体实现:

int main(int argc, char* argv[]) {

    std::string addr = "0.0.0.0:9099";

    int thread_num = 4;

    evpp::EventLoop loop;

    evpp::TCPServer server(&loop, addr, "TCPPingPongServer", thread_num);

    server.SetMessageCallback(&OnMessage);

    server.SetConnectionCallback(&OnConnection);

    server.Init();

    server.Start();

    loop.Run();

    return 0;

}

这是一个简单的server端代码,先是创建了一个EventLoop对象loop,然后把用loop创建TCPServer对象,继续跟进去发现这个loop对象就是“方案9”时序图里的“Base IO Thread”。

我们再来看TCPServer对象的构造函数:

TCPServer::TCPServer(EventLoop* loop,

                     const std::string& laddr,

                     const std::string& name,

                     uint32_t thread_num)

    : loop_(loop)

    , listen_addr_(laddr)

    , name_(name)

    , conn_fn_(&internal::DefaultConnectionCallback)

    , msg_fn_(&internal::DefaultMessageCallback)

    , next_conn_id_(0) {

    tpool_.reset(new EventLoopThreadPool(loop_, thread_num));

}

它用“Base IO Thread”和线程数量作为参数创建了一个EventLoopThreadPool智能指针对象tpool_。

我们看回main函数,它注册好必要的回调(后文会说到)后,调用server.Init()和server.Start()。

init函数主要是创建了一个Listener对象,然后监听socket:

bool TCPServer::Init() {

    listener_.reset(new Listener(loop_, listen_addr_));

    listener_->Listen();

    return true;

}

而Start函数则是重要函数:

bool TCPServer::Start() {

    bool rc = tpool_->Start(true);

    if (rc) {

        listener_->SetNewConnectionCallback(

            std::bind(&TCPServer::HandleNewConn,

                      this,

                      std::placeholders::_1,

                      std::placeholders::_2,

                      std::placeholders::_3));

        listener_->Accept();

    }

    return rc;

}

它让EventLoopThreadPool启动,并且注册一个回调,用于当accept到一个客户端连接后就回调。

最后我们来看下当Listener对象accept到一个连接后,回调函数HandleNewConn做了什么:

void TCPServer::HandleNewConn(evpp_socket_t sockfd,

                              const std::string& remote_addr/*ip:port*/,

                              const struct sockaddr_in* raddr) {

    if (IsStopping()) {

        EVUTIL_CLOSESOCKET(sockfd);

        return;

    }

    EventLoop* io_loop = GetNextLoop(raddr);

    std::string n = remote_addr;

    ++next_conn_id_;

    TCPConnPtr conn(new TCPConn(io_loop, n, sockfd, listen_addr_, remote_addr, next_conn_id_));

    conn->SetMessageCallback(msg_fn_);

    conn->SetConnectionCallback(conn_fn_);

    conn->SetCloseCallback(std::bind(&TCPServer::RemoveConnection, this, std::placeholders::_1));

    io_loop->RunInLoop(std::bind(&TCPConn::OnAttachedToLoop, conn));

    connections_[conn->id()] = conn;

}

里面有一个重要函数GetNextLoop(),该函数返回的io_loop就是“方案9”时序图里面的“IO Thread n”,最后 利用io_loop这个EventLoop对象创建一个TCPConn智能指针对象等待io,跟“方案9”的思路如出一辙,也是“one loop per thread”,一个EventLoop挂一个连接。

 

给我留言

留言无头像?