evpp是奇虎360内部使用的开源多线程网络库,集tcp/udp/http多种协议的服务器和客户端支持。它可以不依赖boost库,使用现代c++14语言(evpp/invoke_timer.cc的lambda表达式使用到了c++14的特性)进行编码。本项目高度参考了muduo网络库,而底层使用现成的libevent库作为事件驱动库,典型的一个reactor网络编程模式的例子,本文就是通过分析evpp源码来达到学习c++网络编程的效果。
muduo代码我也拜读过,muduo有个特点,它完全是为linux而写的,不支持windows,譬如里面用到了eventfd,timerfd以及epoll等,都是linux系统特有的,而且还跟linux版本有关,系统版本太低也不支持,譬如eventfd等,而evpp做了一定的平台兼容性,得益于libevent库,能一定程度做到支持windows平台。另外,muduo有个base库,是重复造轮子了,其他还好,也是一个不可多得的多线程网络服务器编程demo,值得参考,而evpp没有像muduo那样重新实现一套基础库(如线程库、互斥锁、条件变量等),而是直接利用了c++11/c++14自带的std::thread、std::mutex等,相对通用很多,而且学习这些类库使用在其他项目也能用得着,毕竟是c++的标准类库。
摘抄了github上的说明,大家可以点进去前文给出的github路径阅读Readme:
我们给出基础的TCP服务器和客户端的例子,http的例子请自行阅读。
TCP服务器端:
server.cc
#include <evpp/tcp_server.h>
#include <evpp/buffer.h>
#include <evpp/tcp_conn.h>
void OnConnection(const evpp::TCPConnPtr& conn) {
if (conn->IsConnected()) {
conn->SetTCPNoDelay(true);
}
}
void OnMessage(const evpp::TCPConnPtr& conn,
evpp::Buffer* msg) {
conn->Send(msg);
}
int main(int argc, char* argv[]) {
std::string addr = "0.0.0.0:9099";
int thread_num = 4;
if (argc != 1 && argc != 3) {
printf("Usage: %s <port> <thread-num>\n", argv[0]);
printf(" e.g: %s 9099 12\n", argv[0]);
return 0;
}
if (argc == 3) {
addr = std::string("0.0.0.0:") + argv[1];
thread_num = atoi(argv[2]);
}
evpp::EventLoop loop;
evpp::TCPServer server(&loop, addr, "TCPPingPongServer", thread_num);
server.SetMessageCallback(&OnMessage);
server.SetConnectionCallback(&OnConnection);
server.Init();
server.Start();
loop.Run();
return 0;
}
客户端:
client_fixed_size.cc
// Modified from https://github.com/chenshuo/muduo/blob/master/examples/pingpong/client.cc
//
// Every time, we need to receive the whole message and then we can send the next one.
//
#include <iostream>
#include <evpp/tcp_client.h>
#include <evpp/event_loop_thread_pool.h>
#include <evpp/buffer.h>
#include <evpp/tcp_conn.h>
class Client;
class Session {
public:
Session(evpp::EventLoop* loop,
const std::string& serverAddr/*ip:port*/,
const std::string& name,
size_t block_size,
Client* owner)
: client_(loop, serverAddr, name),
owner_(owner),
bytes_read_(0),
bytes_written_(0),
messages_read_(0),
block_size_(block_size) {
client_.SetConnectionCallback(
std::bind(&Session::OnConnection, this, std::placeholders::_1));
client_.SetMessageCallback(
std::bind(&Session::OnMessage, this, std::placeholders::_1, std::placeholders::_2));
}
void Start() {
client_.Connect();
}
void Stop() {
client_.Disconnect();
}
int64_t bytes_read() const {
return bytes_read_;
}
int64_t messages_read() const {
return messages_read_;
}
private:
void OnConnection(const evpp::TCPConnPtr& conn);
void OnMessage(const evpp::TCPConnPtr& conn, evpp::Buffer* buf) {
++messages_read_;
while (buf->size() >= block_size_) {
bytes_read_ += block_size_;
bytes_written_ += block_size_;
conn->Send(buf->data(), block_size_);
buf->Skip(block_size_);
}
}
private:
evpp::TCPClient client_;
Client* owner_;
int64_t bytes_read_;
int64_t bytes_written_;
int64_t messages_read_;
size_t block_size_;
};
class Client {
public:
Client(evpp::EventLoop* loop,
const std::string& name,
const std::string& serverAddr, // ip:port
int blockSize,
int sessionCount,
int timeout_sec,
int threadCount)
: loop_(loop),
name_(name),
session_count_(sessionCount),
timeout_(timeout_sec),
connected_count_(0) {
loop->RunAfter(evpp::Duration(double(timeout_sec)), std::bind(&Client::HandleTimeout, this));
tpool_.reset(new evpp::EventLoopThreadPool(loop, threadCount));
tpool_->Start(true);
for (int i = 0; i < blockSize; ++i) {
message_.push_back(static_cast<char>(i % 128));
}
for (int i = 0; i < sessionCount; ++i) {
char buf[32];
snprintf(buf, sizeof buf, "C%05d", i);
Session* session = new Session(tpool_->GetNextLoop(), serverAddr, buf, blockSize, this);
session->Start();
sessions_.push_back(session);
}
}
~Client() {
}
const std::string& message() const {
return message_;
}
void OnConnect() {
if (++connected_count_ == session_count_) {
std::cout << "all connected" << std::endl;
}
}
void OnDisconnect(const evpp::TCPConnPtr& conn) {
if (--connected_count_ == 0) {
std::cout << "all disconnected" << std::endl;
int64_t totalBytesRead = 0;
int64_t totalMessagesRead = 0;
for (auto &it : sessions_) {
totalBytesRead += it->bytes_read();
totalMessagesRead += it->messages_read();
}
std::cout << "name=" << name_ << " " << totalBytesRead << " total bytes read" << std::endl;
std::cout << "name=" << name_ << " " << totalMessagesRead << " total messages read" << std::endl;
std::cout << "name=" << name_ << " " << static_cast<double>(totalBytesRead) / static_cast<double>(totalMessagesRead) << " average message size" << std::endl;
std::cout << "name=" << name_ << " " << static_cast<double>(totalBytesRead) / (timeout_ * 1024 * 1024) << " MiB/s throughput" << std::endl;
loop_->QueueInLoop(std::bind(&Client::Quit, this));
}
}
private:
void Quit() {
tpool_->Stop();
loop_->Stop();
for (auto &it : sessions_) {
delete it;
}
sessions_.clear();
while (!tpool_->IsStopped() || !loop_->IsStopped()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
tpool_.reset();
}
void HandleTimeout() {
std::cout << "stop" << std::endl;
for (auto &it : sessions_) {
it->Stop();
}
}
private:
evpp::EventLoop* loop_;
std::string name_;
std::shared_ptr<evpp::EventLoopThreadPool> tpool_;
int session_count_;
int timeout_;
std::vector<Session*> sessions_;
std::string message_;
std::atomic<int> connected_count_;
};
void Session::OnConnection(const evpp::TCPConnPtr& conn) {
if (conn->IsConnected()) {
conn->SetTCPNoDelay(true);
conn->Send(owner_->message());
owner_->OnConnect();
} else {
owner_->OnDisconnect(conn);
}
}
int main(int argc, char* argv[]) {
if (argc != 7) {
fprintf(stderr, "Usage: client <host_ip> <port> <threads> <blocksize> <sessions> <time_seconds>\n");
return -1;
}
const char* ip = argv[1];
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
int threadCount = atoi(argv[3]);
int blockSize = atoi(argv[4]);
int sessionCount = atoi(argv[5]);
int timeout = atoi(argv[6]);
evpp::EventLoop loop;
std::string serverAddr = std::string(ip) + ":" + std::to_string(port);
Client client(&loop, argv[0], serverAddr, blockSize, sessionCount, timeout, threadCount);
loop.Run();
return 0;
}
server.cc很简单,收到什么就把返回什么,类似echo服务器,而client_fixed_size.cc比较复杂,下发一大段数据,收到服务器返回的数据后测量平均时间,pingpang性能测试,源码在evpp/benchmark/throughput/evpp/下。看得出来类的使用跟muduo如出一辙,也是抛弃c++的继承,积极使用"std::function+std::bind"或lambda来注册回调函数。作者也是致敬了一番muduo库。
可以在ubuntu下开启两个shell终端,分别运行:
./server 9099 200
以及
./client_fixed_size 127.0.0.1 9099 150 512 100 10
#all disconnected
#name=./client_fixed_size 357193728 total bytes read
#name=./client_fixed_size 697644 total messages read
#name=./client_fixed_size 512 average message size
#name=./client_fixed_size 34.0646 MiB/s throughput
从上面的客户端和服务器代码可以看到,evpp封装了两个比较常用的接口:SetConnectionCallback和SetMessageCallback。这两个回调所要处理的就是所谓的“应用层”,而库是底层,在写应用时,无需关注底层。也就是说evpp库分离了“用户逻辑”与“繁琐的tcp socket读写操作”。linux内核也是大量使用这种注册回调到框架的思路,以此实现“面向对象”和“机制与策略分离”。
1,当TCPClient建立连接或存在的连接断开或建立连接失败时,就会回调我们注册的回调函数:
// Set a connection event relative callback when the TCPClient
// establishes a connection or an exist connection breaks down or failed to establish a connection.
// When these three events happened, the value of the parameter in the callback is:
// 1. Successfully establish a connection : TCPConn::IsConnected() == true
// 2. An exist connection broken down : TCPConn::IsDisconnecting() == true
// 3. Failed to establish a connection : TCPConn::IsDisconnected() == true and TCPConn::fd() == -1
void TCPClient::SetConnectionCallback(const ConnectionCallback& cb);
// Set the message callback to handle the messages from remote server
void TCPClient::SetMessageCallback(const MessageCallback& cb);
2,当TCPServer收到新连接或现有连接中断时,就会回调我们注册的回调函数:
// Set a connection event relative callback when the TCPServer
// receives a new connection or an exist connection breaks down.
// When these two events happened, the value of the parameter in the callback is:
// 1. Received a new connection : TCPConn::IsConnected() == true
// 2. An exist connection broken down : TCPConn::IsDisconnecting() == true
void TCPServer::SetConnectionCallback(const ConnectionCallback& cb);
// Set the message callback to handle the messages from remote client
void TCPServer::SetMessageCallback(MessageCallback cb);
通俗地认为,SetConnectionCallback是处理刚连上或者掉线时的业务,譬如可以在连接上后保存evpp::TCPConnPtr(智能指针),断开连接就reset该TCPConnPtr,使其变为nullptr,用于后续send数据,参考evpp/examples/chatroom/simple底下的简易聊天app:client.cc、codec.h和server.cc:
//参考evpp/examples/chatroom/simple/client.cc
std::mutex mutex_;
evpp::TCPConnPtr connection_;
...
...
void OnConnection(const evpp::TCPConnPtr& conn)
{
LOG_INFO << conn->AddrToString() << " is " << (conn->IsConnected() ? "UP" : "DOWN");
std::lock_guard<std::mutex> lock(mutex_);
if (conn->IsConnected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}
void Write(const evpp::Slice& message)
{
std::lock_guard<std::mutex> lock(mutex_);
if (connection_)
{
evpp::Buffer buf;
buf.Append(message.data(), message.size());
buf.PrependInt32(message.size());
connection_->Send(&buf);
}
}
...
...
std::string line = "hello world";
Write(line);
而SetMessageCallback则是用于处理接收对端的消息,数据能从evpp::Buffer类中得到,由于使用了reactor模式,这些回调函数都不能阻塞,想要延时一段时间处理的话,不能直接使用sleep(),可以注册超时时间,再比如处理一些会阻塞的任务时,可以自己做一个“任务队列+线程池”,不过这些都是编程手段,跟evpp库无关。
TCPClient类和TCPServer类怎么使用呢?
作者在tcp_client.h里写得很清楚:
1.创建一个TCPClient对象
2.使用SetConnectionCallback()和SetMessageCallback()设置回调函数
3.调用TCPClient::Connect()去尝试跟远端的服务器建立一个tcp连接
4.使用TCPClient::Send去发送消息
5.在回调函数中处理"连接"和"消息"
6.想要关闭连接,只需调用TCPClient::Disonnect()
而在tcp_server.h上也有描述:
1.创建一个TCPServer对象
2.使用SetConnectionCallback()和SetMessageCallback()设置回调函数
3.调用TCPServer::Init()
4.调用TCPServer::Start()
5.在回调函数中处理"连接"和"消息"
6.最后想要关闭服务,只需调用Server::Stop()
上面列出的使用方法,可以在evpp/examples/*下找例子进行印证!
在分析代码前,要先学会怎么使用!
目前有 1 条留言 访客:0 条, 博主:0 条 ,引用: 1 条
外部的引用: 1 条
- evpp网络库代码分析(二) | 求索阁