现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

evpp网络库代码分析(一)

2020-04-26 15:28 工业·编程 ⁄ 共 9988字 ⁄ 字号 评论 1 条

    evpp是奇虎360内部使用的开源多线程网络库,集tcp/udp/http多种协议的服务器和客户端支持。它可以不依赖boost库,使用现代c++14语言(evpp/invoke_timer.cc的lambda表达式使用到了c++14的特性)进行编码。本项目高度参考了muduo网络库,而底层使用现成的libevent库作为事件驱动库,典型的一个reactor网络编程模式的例子,本文就是通过分析evpp源码来达到学习c++网络编程的效果。

    muduo代码我也拜读过,muduo有个特点,它完全是为linux而写的,不支持windows,譬如里面用到了eventfd,timerfd以及epoll等,都是linux系统特有的,而且还跟linux版本有关,系统版本太低也不支持,譬如eventfd等,而evpp做了一定的平台兼容性,得益于libevent库,能一定程度做到支持windows平台。另外,muduo有个base库,是重复造轮子了,其他还好,也是一个不可多得的多线程网络服务器编程demo,值得参考,而evpp没有像muduo那样重新实现一套基础库(如线程库、互斥锁、条件变量等),而是直接利用了c++11/c++14自带的std::thread、std::mutex等,相对通用很多,而且学习这些类库使用在其他项目也能用得着,毕竟是c++的标准类库。

摘抄了github上的说明,大家可以点进去前文给出的github路径阅读Readme:

20200704203733356

20200704203211479

我们给出基础的TCP服务器和客户端的例子,http的例子请自行阅读。

TCP服务器端:

server.cc

#include <evpp/tcp_server.h>

#include <evpp/buffer.h>

#include <evpp/tcp_conn.h>

void OnConnection(const evpp::TCPConnPtr& conn) {

    if (conn->IsConnected()) {

        conn->SetTCPNoDelay(true);

    }

}

void OnMessage(const evpp::TCPConnPtr& conn,

               evpp::Buffer* msg) {

    conn->Send(msg);

}

int main(int argc, char* argv[]) {

    std::string addr = "0.0.0.0:9099";

    int thread_num = 4;

    if (argc != 1 && argc != 3) {

        printf("Usage: %s <port> <thread-num>\n", argv[0]);

        printf("  e.g: %s 9099 12\n", argv[0]);

        return 0;

    }

    if (argc == 3) {

        addr = std::string("0.0.0.0:") + argv[1];

        thread_num = atoi(argv[2]);

    }

    evpp::EventLoop loop;

    evpp::TCPServer server(&loop, addr, "TCPPingPongServer", thread_num);

    server.SetMessageCallback(&OnMessage);

    server.SetConnectionCallback(&OnConnection);

    server.Init();

    server.Start();

    loop.Run();

    return 0;

}

客户端:

client_fixed_size.cc

// Modified from https://github.com/chenshuo/muduo/blob/master/examples/pingpong/client.cc

//

// Every time, we need to receive the whole message and then we can send the next one.

//

#include <iostream>

#include <evpp/tcp_client.h>

#include <evpp/event_loop_thread_pool.h>

#include <evpp/buffer.h>

#include <evpp/tcp_conn.h>

class Client;

class Session {

public:

    Session(evpp::EventLoop* loop,

            const std::string& serverAddr/*ip:port*/,

            const std::string& name,

            size_t block_size,

            Client* owner)

        : client_(loop, serverAddr, name),

        owner_(owner),

        bytes_read_(0),

        bytes_written_(0),

        messages_read_(0),

        block_size_(block_size) {

        client_.SetConnectionCallback(

            std::bind(&Session::OnConnection, this, std::placeholders::_1));

        client_.SetMessageCallback(

            std::bind(&Session::OnMessage, this, std::placeholders::_1, std::placeholders::_2));

    }

    void Start() {

        client_.Connect();

    }

    void Stop() {

        client_.Disconnect();

    }

    int64_t bytes_read() const {

        return bytes_read_;

    }

    int64_t messages_read() const {

        return messages_read_;

    }

private:

    void OnConnection(const evpp::TCPConnPtr& conn);

    void OnMessage(const evpp::TCPConnPtr& conn, evpp::Buffer* buf) {

        ++messages_read_;

        while (buf->size() >= block_size_) {

            bytes_read_ += block_size_;

            bytes_written_ += block_size_;

            conn->Send(buf->data(), block_size_);

            buf->Skip(block_size_);

        }

    }

private:

    evpp::TCPClient client_;

    Client* owner_;

    int64_t bytes_read_;

    int64_t bytes_written_;

    int64_t messages_read_;

    size_t block_size_;

};

class Client {

public:

    Client(evpp::EventLoop* loop,

           const std::string& name,

           const std::string& serverAddr, // ip:port

           int blockSize,

           int sessionCount,

           int timeout_sec,

           int threadCount)

        : loop_(loop),

        name_(name),

        session_count_(sessionCount),

        timeout_(timeout_sec),

        connected_count_(0) {

        loop->RunAfter(evpp::Duration(double(timeout_sec)), std::bind(&Client::HandleTimeout, this));

        tpool_.reset(new evpp::EventLoopThreadPool(loop, threadCount));

        tpool_->Start(true);

        for (int i = 0; i < blockSize; ++i) {

            message_.push_back(static_cast<char>(i % 128));

        }

        for (int i = 0; i < sessionCount; ++i) {

            char buf[32];

            snprintf(buf, sizeof buf, "C%05d", i);

            Session* session = new Session(tpool_->GetNextLoop(), serverAddr, buf, blockSize, this);

            session->Start();

            sessions_.push_back(session);

        }

    }

    ~Client() {

    }

    const std::string& message() const {

        return message_;

    }

    void OnConnect() {

        if (++connected_count_ == session_count_) {

            std::cout << "all connected" << std::endl;

        }

    }

    void OnDisconnect(const evpp::TCPConnPtr& conn) {

        if (--connected_count_ == 0) {

            std::cout << "all disconnected" << std::endl;

            int64_t totalBytesRead = 0;

            int64_t totalMessagesRead = 0;

            for (auto &it : sessions_) {

                totalBytesRead += it->bytes_read();

                totalMessagesRead += it->messages_read();

            }

            std::cout << "name=" << name_ << " " << totalBytesRead << " total bytes read" << std::endl;

            std::cout << "name=" << name_ << " " << totalMessagesRead << " total messages read" << std::endl;

            std::cout << "name=" << name_ << " " << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead) << " average message size" << std::endl;

            std::cout << "name=" << name_ << " " << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024) << " MiB/s throughput" << std::endl;

            loop_->QueueInLoop(std::bind(&Client::Quit, this));

        }

    }

private:

    void Quit() {

        tpool_->Stop();

        loop_->Stop();

        for (auto &it : sessions_) {

            delete it;

        }

        sessions_.clear();

        while (!tpool_->IsStopped() || !loop_->IsStopped()) {

            std::this_thread::sleep_for(std::chrono::seconds(1));

        }

        tpool_.reset();

    }

    void HandleTimeout() {

        std::cout << "stop" << std::endl;

        for (auto &it : sessions_) {

            it->Stop();

        }

    }

private:

    evpp::EventLoop* loop_;

    std::string name_;

    std::shared_ptr<evpp::EventLoopThreadPool> tpool_;

    int session_count_;

    int timeout_;

    std::vector<Session*> sessions_;

    std::string message_;

    std::atomic<int> connected_count_;

};

void Session::OnConnection(const evpp::TCPConnPtr& conn) {

    if (conn->IsConnected()) {

        conn->SetTCPNoDelay(true);

        conn->Send(owner_->message());

        owner_->OnConnect();

    } else {

        owner_->OnDisconnect(conn);

    }

}

int main(int argc, char* argv[]) {

    if (argc != 7) {

        fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> <sessions> <time_seconds>\n");

        return -1;

    }

    const char* ip = argv[1];

    uint16_t port = static_cast<uint16_t>(atoi(argv[2]));

    int threadCount = atoi(argv[3]);

    int blockSize = atoi(argv[4]);

    int sessionCount = atoi(argv[5]);

    int timeout = atoi(argv[6]);

    evpp::EventLoop loop;

    std::string serverAddr = std::string(ip) + ":" + std::to_string(port);

    Client client(&loop, argv[0], serverAddr, blockSize, sessionCount, timeout, threadCount);

    loop.Run();

    return 0;

}

    server.cc很简单,收到什么就把返回什么,类似echo服务器,而client_fixed_size.cc比较复杂,下发一大段数据,收到服务器返回的数据后测量平均时间,pingpang性能测试,源码在evpp/benchmark/throughput/evpp/下。看得出来类的使用跟muduo如出一辙,也是抛弃c++的继承,积极使用"std::function+std::bind"或lambda来注册回调函数。作者也是致敬了一番muduo库。

可以在ubuntu下开启两个shell终端,分别运行:

./server 9099 200

以及

./client_fixed_size 127.0.0.1 9099 150 512 100 10

#all disconnected

#name=./client_fixed_size 357193728 total bytes read

#name=./client_fixed_size 697644 total messages read

#name=./client_fixed_size 512 average message size

#name=./client_fixed_size 34.0646 MiB/s throughput

从上面的客户端和服务器代码可以看到,evpp封装了两个比较常用的接口:SetConnectionCallback和SetMessageCallback。这两个回调所要处理的就是所谓的“应用层”,而库是底层,在写应用时,无需关注底层。也就是说evpp库分离了“用户逻辑”与“繁琐的tcp socket读写操作”。linux内核也是大量使用这种注册回调到框架的思路,以此实现“面向对象”和“机制与策略分离”。

1,当TCPClient建立连接或存在的连接断开或建立连接失败时,就会回调我们注册的回调函数:

// Set a connection event relative callback when the TCPClient

// establishes a connection or an exist connection breaks down or failed to establish a connection.

// When these three events happened, the value of the parameter in the callback is:

//      1. Successfully establish a connection : TCPConn::IsConnected() == true

//      2. An exist connection broken down : TCPConn::IsDisconnecting() == true

//      3. Failed to establish a connection : TCPConn::IsDisconnected() == true and TCPConn::fd() == -1

void TCPClient::SetConnectionCallback(const ConnectionCallback& cb);

// Set the message callback to handle the messages from remote server

void TCPClient::SetMessageCallback(const MessageCallback& cb);

2,当TCPServer收到新连接或现有连接中断时,就会回调我们注册的回调函数:

// Set a connection event relative callback when the TCPServer

// receives a new connection or an exist connection breaks down.

// When these two events happened, the value of the parameter in the callback is:

//      1. Received a new connection : TCPConn::IsConnected() == true

//      2. An exist connection broken down : TCPConn::IsDisconnecting() == true

void TCPServer::SetConnectionCallback(const ConnectionCallback& cb);

// Set the message callback to handle the messages from remote client

void TCPServer::SetMessageCallback(MessageCallback cb);

通俗地认为,SetConnectionCallback是处理刚连上或者掉线时的业务,譬如可以在连接上后保存evpp::TCPConnPtr(智能指针),断开连接就reset该TCPConnPtr,使其变为nullptr,用于后续send数据,参考evpp/examples/chatroom/simple底下的简易聊天app:client.cc、codec.h和server.cc:

//参考evpp/examples/chatroom/simple/client.cc

std::mutex mutex_;

evpp::TCPConnPtr connection_;

...

...

void OnConnection(const evpp::TCPConnPtr& conn)

{

    LOG_INFO << conn->AddrToString() << " is " << (conn->IsConnected() ? "UP" : "DOWN");

    std::lock_guard<std::mutex> lock(mutex_);

    if (conn->IsConnected())

    {

      connection_ = conn;

    }

    else

    {

      connection_.reset();

    }

}

void Write(const evpp::Slice& message)

{

    std::lock_guard<std::mutex> lock(mutex_);

    if (connection_)

    {

      evpp::Buffer buf;

      buf.Append(message.data(), message.size());

      buf.PrependInt32(message.size());

      connection_->Send(&buf);

    }

}

...

...

std::string line = "hello world";

Write(line);

而SetMessageCallback则是用于处理接收对端的消息,数据能从evpp::Buffer类中得到,由于使用了reactor模式,这些回调函数都不能阻塞,想要延时一段时间处理的话,不能直接使用sleep(),可以注册超时时间,再比如处理一些会阻塞的任务时,可以自己做一个“任务队列+线程池”,不过这些都是编程手段,跟evpp库无关。

        TCPClient类和TCPServer类怎么使用呢?

作者在tcp_client.h里写得很清楚:

1.创建一个TCPClient对象

2.使用SetConnectionCallback()和SetMessageCallback()设置回调函数

3.调用TCPClient::Connect()去尝试跟远端的服务器建立一个tcp连接

4.使用TCPClient::Send去发送消息

5.在回调函数中处理"连接"和"消息"

6.想要关闭连接,只需调用TCPClient::Disonnect()

而在tcp_server.h上也有描述:

1.创建一个TCPServer对象

2.使用SetConnectionCallback()和SetMessageCallback()设置回调函数

3.调用TCPServer::Init()

4.调用TCPServer::Start()

5.在回调函数中处理"连接"和"消息"

6.最后想要关闭服务,只需调用Server::Stop()

上面列出的使用方法,可以在evpp/examples/*下找例子进行印证!

在分析代码前,要先学会怎么使用!

 

目前有 1 条留言    访客:0 条, 博主:0 条 ,引用: 1 条

    外部的引用: 1 条

    • evpp网络库代码分析(二) | 求索阁

    给我留言

    留言无头像?