uSockets主要数据结构:
struct us_poll_t {
uv_poll_t uv_p;
LIBUS_SOCKET_DESCRIPTOR fd;//这个其实就是把平台摸平了的fd
unsigned char poll_type;
};
/* Internal callback types are polls just like sockets */
struct us_internal_callback_t {
alignas(LIBUS_EXT_ALIGNMENT) struct us_poll_t p;
struct us_loop_t *loop;
int cb_expects_the_loop;
void (*cb)(struct us_internal_callback_t *cb);
};
struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + sizeof(uv_timer_t) + ext_size);
cb->loop = loop;
cb->cb_expects_the_loop = 0;
uv_timer_t *uv_timer = (uv_timer_t *) (cb + 1);
uv_timer_init(loop->uv_loop, uv_timer);
uv_timer->data = cb;
if (fallthrough) {
uv_unref((uv_handle_t *) uv_timer);
}
return (struct us_timer_t *) cb;
}
// async (internal only)
struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {
struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + sizeof(uv_async_t) + ext_size);
cb->loop = loop;
return (struct us_internal_async *) cb;
}
struct us_internal_loop_data_t {
struct us_timer_t *sweep_timer;// us_timer_t 是 Opaque pointer(不透明指针)无实现
struct us_internal_async *wakeup_async;//Opaque pointer 无实现
int last_write_failed;
struct us_socket_context_t *head;
struct us_socket_context_t *iterator;
char *recv_buf;
void *ssl_data;
void (*pre_cb)(struct us_loop_t *);
void (*post_cb)(struct us_loop_t *);
struct us_socket_t *closed_head;
/* We do not care if this flips or not, it doesn't matter */
long long iteration_nr;
};
struct us_loop_t {
alignas(LIBUS_EXT_ALIGNMENT) struct us_internal_loop_data_t data;
uv_loop_t *uv_loop;
int is_default;
uv_prepare_t *uv_pre;
uv_check_t *uv_check;
};
//这个好像没怎么用到,只在context.c中传参中用到
struct us_socket_context_options_t {
const char *key_file_name;
const char *cert_file_name;
const char *passphrase;
const char *dh_params_file_name;
const char *ca_file_name;
int ssl_prefer_low_memory_usage;
};
struct us_socket_t {
alignas(LIBUS_EXT_ALIGNMENT) struct us_poll_t p;
struct us_socket_context_t *context;
struct us_socket_t *prev, *next;//这里隐藏了一个链表
unsigned short timeout;
};
struct us_socket_context_t {
alignas(LIBUS_EXT_ALIGNMENT) struct us_loop_t *loop;
//unsigned short timeout;
struct us_socket_t *head;
struct us_socket_t *iterator;
struct us_socket_context_t *prev, *next;//这里隐藏了一个链表
// 重要callbacks
struct us_socket_t *(*on_open)(struct us_socket_t *, int is_client, char *ip, int ip_length);
struct us_socket_t *(*on_data)(struct us_socket_t *, char *data, int length);
struct us_socket_t *(*on_writable)(struct us_socket_t *);
struct us_socket_t *(*on_close)(struct us_socket_t *);
//void (*on_timeout)(struct us_socket_context *);
struct us_socket_t *(*on_socket_timeout)(struct us_socket_t *);
struct us_socket_t *(*on_end)(struct us_socket_t *);
int (*ignore_data)(struct us_socket_t *);
};
uWebSockets 主要数据结构:
App.h:
namespace uWS {
template <bool SSL>
struct TemplatedApp {
private:
/* The app always owns at least one http context, but creates websocket contexts on demand */
HttpContext<SSL> *httpContext;
std::vector<WebSocketContext<SSL, true> *> webSocketContexts;
struct WebSocketBehavior {
CompressOptions compression = DISABLED;
int maxPayloadLength = 16 * 1024;
int idleTimeout = 120;
int maxBackpressure = 1 * 1024 * 1204;
fu2::unique_function<void(uWS::WebSocket<SSL, true> *, HttpRequest *)> open = nullptr;
fu2::unique_function<void(uWS::WebSocket<SSL, true> *, std::string_view, uWS::OpCode)> message = nullptr;
fu2::unique_function<void(uWS::WebSocket<SSL, true> *)> drain = nullptr;
fu2::unique_function<void(uWS::WebSocket<SSL, true> *)> ping = nullptr;
fu2::unique_function<void(uWS::WebSocket<SSL, true> *)> pong = nullptr;
fu2::unique_function<void(uWS::WebSocket<SSL, true> *, int, std::string_view)> close = nullptr;
};
};
typedef TemplatedApp<false> App;
typedef TemplatedApp<true> SSLApp;
}
AsyncSocket.h:
namespace uWS {
template <bool, bool> struct WebSocketContext;
template <bool SSL>
struct AsyncSocket {
template <bool> friend struct HttpContext;
template <bool, bool> friend struct WebSocketContext;
template <bool> friend struct WebSocketContextData;
friend struct TopicTree;
protected:
/* Get loop data for socket */
LoopData *getLoopData() {
return (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this)));
}
/* Get socket extension */
AsyncSocketData<SSL> *getAsyncSocketData() {
return (AsyncSocketData<SSL> *) us_socket_ext(SSL, (us_socket_t *) this);
}
};
}
AsycSocketData.h:
namespace uWS {
/* Depending on how we want AsyncSocket to function, this will need to change */
template <bool SSL>
struct AsyncSocketData {
/* This will do for now */
std::string buffer;
/* Allow move constructing us */
AsyncSocketData(std::string &&backpressure) : buffer(std::move(backpressure)) {
}
/* Or emppty */
AsyncSocketData() = default;
};
}
HttpContext.h:
namespace uWS {
template<bool> struct HttpResponse;
template <bool SSL>
struct HttpContext {
template<bool> friend struct TemplatedApp;
private:
HttpContext() = delete;
/* Maximum delay allowed until an HTTP connection is terminated due to outstanding request or rejected data (slow loris protection) */
static const int HTTP_IDLE_TIMEOUT_S = 10;
us_socket_context_t *getSocketContext() {
return (us_socket_context_t *) this;
}
static us_socket_context_t *getSocketContext(us_socket_t *s) {
return (us_socket_context_t *) us_socket_context(SSL, s);
}
HttpContextData<SSL> *getSocketContextData() {
return (HttpContextData<SSL> *) us_socket_context_ext(SSL, getSocketContext());
}
static HttpContextData<SSL> *getSocketContextDataS(us_socket_t *s) {
return (HttpContextData<SSL> *) us_socket_context_ext(SSL, getSocketContext(s));
}
HttpContextData.h:
namespace uWS {
template<bool> struct HttpResponse;
struct HttpRequest;
template <bool SSL>
struct alignas(16) HttpContextData {
template <bool> friend struct HttpContext;
template <bool> friend struct HttpResponse;
private:
std::vector<fu2::unique_function<void(HttpResponse<SSL> *, int)>> filterHandlers;
struct RouterData {
HttpResponse<SSL> *httpResponse;
HttpRequest *httpRequest;
};
HttpRouter<RouterData> router;
void *upgradedWebSocket = nullptr;
};
}
HttpParser.h:
namespace uWS {
/* We require at least this much post padding */
static const int MINIMUM_HTTP_POST_PADDING = 32;
struct HttpRequest {
friend struct HttpParser;
private:
const static int MAX_HEADERS = 50;
struct Header {
std::string_view key, value;
} headers[MAX_HEADERS];
int querySeparator;
bool didYield;
std::pair<int, std::string_view *> currentParameters;
};
struct HttpParser {
private:
std::string fallback;
unsigned int remainingStreamingBytes = 0;
const size_t MAX_FALLBACK_SIZE = 1024 * 4;
HttpResponseData.h:
namespace uWS {
template <bool SSL>
struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {
template <bool> friend struct HttpResponse;
template <bool> friend struct HttpContext;
private:
/* Bits of status */
enum {
HTTP_STATUS_CALLED = 1, // used
HTTP_WRITE_CALLED = 2, // used
HTTP_END_CALLED = 4, // used
HTTP_RESPONSE_PENDING = 8, // used
HTTP_ENDED_STREAM_OUT = 16 // not used
};
/* Per socket event handlers */
fu2::unique_function<bool(int)> onWritable;
fu2::unique_function<void()> onAborted;
fu2::unique_function<void(std::string_view, bool)> inStream; // onData
/* Outgoing offset */
int offset = 0;
/* Current state (content-length sent, status sent, write called, etc */
int state = 0;
};
}
HttpResponse.h:
namespace uWS {
/* Some pre-defined status constants to use with writeStatus */
static const char *HTTP_200_OK = "200 OK";
/* The general timeout for HTTP sockets */
static const int HTTP_TIMEOUT_S = 10;
template <bool SSL>
struct HttpResponse : public AsyncSocket<SSL> {
/* Solely used for getHttpResponseData() */
template <bool> friend struct TemplatedApp;
typedef AsyncSocket<SSL> Super;
private:
HttpResponseData<SSL> *getHttpResponseData() {
return (HttpResponseData<SSL> *) Super::getAsyncSocketData();
}
/* Immediately terminate this Http response */
using Super::close;
using Super::getRemoteAddress;
HttpRouter.h:
namespace uWS {
template <typename USERDATA>
struct HttpRouter {
private:
static const unsigned int MAX_URL_SEGMENTS = 100;
/* Basically a pre-allocated stack */
struct RouteParameters {
friend struct HttpRouter;
private:
std::string_view params[MAX_URL_SEGMENTS];
int paramsTop;
void reset() {
paramsTop = -1;
}
void push(std::string_view param) {
/* We check these bounds indirectly via the urlSegments limit */
params[++paramsTop] = param;
}
void pop() {
/* Same here, we cannot pop outside */
paramsTop--;
}
} routeParameters;
std::vector<fu2::unique_function<bool(USERDATA &, std::pair<int, std::string_view *>)>> handlers;
HttpRouter(const HttpRouter &other) = delete;
struct Node {
std::string name;
std::vector<Node *> children;
short handler = 0; // unhandled
} tree;
std::string_view currentUrl;
std::string_view urlSegmentVector[MAX_URL_SEGMENTS];
int urlSegmentTop;
LoopData.h:
namespace uWS {
struct Loop;
struct alignas(16) LoopData {
friend struct Loop;
private:
std::mutex deferMutex;
int currentDeferQueue = 0;
std::vector<fu2::unique_function<void()>> deferQueues[2];
/* Map from void ptr to handler */
std::map<void *, fu2::unique_function<void(Loop *)>> postHandlers, preHandlers;
public:
~LoopData() {
/* If we have had App.ws called with compression we need to clear this */
if (zlibContext) {
delete zlibContext;
delete inflationStream;
delete deflationStream;
}
delete [] corkBuffer;
}
/* Good 16k for SSL perf. */
static const int CORK_BUFFER_SIZE = 16 * 1024;
/* Cork data */
char *corkBuffer = new char[CORK_BUFFER_SIZE];
int corkOffset = 0;
void *corkedSocket = nullptr;
/* Per message deflate data */
ZlibContext *zlibContext = nullptr;
InflationStream *inflationStream = nullptr;
DeflationStream *deflationStream = nullptr;
};
}
Loop.h:
namespace uWS {
struct Loop {
private:
Loop *init() {
new (us_loop_ext((us_loop_t *) this)) LoopData;
return this;
}
static Loop *create(void *hint) {
return ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init();
}
public:
/* Lazily initializes a per-thread loop and returns it.
* Will automatically free all initialized loops at exit. */
static Loop *get(void *existingNativeLoop = nullptr) {
static thread_local LoopCleaner lazyLoop;
if (!lazyLoop.loop) {
/* If we are given a native loop pointer we pass that to uSockets and let it deal with it */
if (existingNativeLoop) {
/* Todo: here we want to pass the pointer, not a boolean */
lazyLoop.loop = create(existingNativeLoop);
/* We cannot register automatic free here, must be manually done */
} else {
lazyLoop.loop = create(nullptr);
lazyLoop.cleanMe = true;
}
}
return lazyLoop.loop;
}
PerMessageDeflate.h:
namespace uWS {
/* Do not compile this module if we don't want it */
#ifdef UWS_NO_ZLIB
struct ZlibContext {};
struct InflationStream {
std::string_view inflate(ZlibContext *zlibContext, std::string_view compressed, size_t maxPayloadLength) {
return compressed;
}
};
struct DeflationStream {
std::string_view deflate(ZlibContext *zlibContext, std::string_view raw, bool reset) {
return raw;
}
};
#else
#define LARGE_BUFFER_SIZE 1024 * 16 // todo: fix this
struct ZlibContext {
/* Any returned data is valid until next same-class call.
* We need to have two classes to allow inflation followed
* by many deflations without modifying the inflation */
std::string dynamicDeflationBuffer;
std::string dynamicInflationBuffer;
char *deflationBuffer;
char *inflationBuffer;
ZlibContext() {
deflationBuffer = (char *) malloc(LARGE_BUFFER_SIZE);
inflationBuffer = (char *) malloc(LARGE_BUFFER_SIZE);
}
struct DeflationStream {
z_stream deflationStream = {};
DeflationStream() {
deflateInit2(&deflationStream, 1, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
}
struct InflationStream {
z_stream inflationStream = {};
InflationStream() {
inflateInit2(&inflationStream, -15);
}
~InflationStream() {
inflateEnd(&inflationStream);
}
TopicTreeDraft.h:
namespace uWS {
/* A Subscriber is an extension of a socket */
struct Subscriber {
std::list<struct Topic *> subscriptions;
void *user;
Subscriber(void *user) : user(user) {}
};
struct Topic {
/* Memory for our name */
char *name;
size_t length;
/* Our parent or nullptr */
Topic *parent = nullptr;
/* Next triggered Topic */
bool triggered = false;
/* Exact string matches */
std::map<std::string_view, Topic *> children;
/* Wildcard child */
Topic *wildcardChild = nullptr;
/* Terminating wildcard child */
Topic *terminatingWildcardChild = nullptr;
/* What we published */
std::map<unsigned int, std::string> messages;
std::set<Subscriber *> subs;
};
struct TopicTree {
private:
std::function<int(Subscriber *, std::string_view)> cb;
Topic *root = new Topic;
/* Global messageId for deduplication of overlapping topics and ordering between topics */
unsigned int messageId = 0;
/* The triggered topics */
Topic *triggeredTopics[64];
int numTriggeredTopics = 0;
Subscriber *min = (Subscriber *) UINTPTR_MAX;
WebSocketContextData.h:
namespace uWS {
template <bool, bool> struct WebSocket;
/* todo: this looks identical to WebSocketBehavior, why not just std::move that entire thing in? */
template <bool SSL>
struct WebSocketContextData {
/* The callbacks for this context */
fu2::unique_function<void(WebSocket<SSL, true> *, std::string_view, uWS::OpCode)> messageHandler = nullptr;
fu2::unique_function<void(WebSocket<SSL, true> *)> drainHandler = nullptr;
fu2::unique_function<void(WebSocket<SSL, true> *, int, std::string_view)> closeHandler = nullptr;
/* Settings for this context */
size_t maxPayloadLength = 0;
int idleTimeout = 0;
/* There needs to be a maxBackpressure which will force close everything over that limit */
size_t maxBackpressure = 0;
/* Each websocket context has a topic tree for pub/sub */
TopicTree topicTree;
WebSocketContext.h:
namespace uWS {
template <bool SSL, bool isServer>
struct WebSocketContext {
template <bool> friend struct TemplatedApp;
template <bool, typename> friend struct WebSocketProtocol;
private:
WebSocketContext() = delete;
us_socket_context_t *getSocketContext() {
return (us_socket_context_t *) this;
}
WebSocketContextData<SSL> *getExt() {
return (WebSocketContextData<SSL> *) us_socket_context_ext(SSL, (us_socket_context_t *) this);
}
public:
/* WebSocket contexts are always child contexts to a HTTP context so no SSL options are needed as they are inherited */
static WebSocketContext *create(Loop *loop, us_socket_context_t *parentSocketContext) {
WebSocketContext *webSocketContext = (WebSocketContext *) us_create_child_socket_context(SSL, parentSocketContext, sizeof(WebSocketContextData<SSL>));
if (!webSocketContext) {
return nullptr;
}
/* Init socket context data */
new ((WebSocketContextData<SSL> *) us_socket_context_ext(SSL, (us_socket_context_t *)webSocketContext)) WebSocketContextData<SSL>;
return webSocketContext->init();
}
WebSocketData.h:
namespace uWS {
struct WebSocketData : AsyncSocketData<false>, WebSocketState<true> {
template <bool, bool> friend struct WebSocketContext;
template <bool, bool> friend struct WebSocket;
private:
std::string fragmentBuffer;
int controlTipLength = 0;
bool isShuttingDown = 0;
enum CompressionStatus : char {
DISABLED,
ENABLED,
COMPRESSED_FRAME
} compressionStatus;
/* We might have a dedicated compressor */
DeflationStream *deflationStream = nullptr;
/* We could be a subscriber */
Subscriber *subscriber = nullptr;
WebSocketExtensions.h:
namespace uWS {
enum Options : unsigned int {
NO_OPTIONS = 0,
PERMESSAGE_DEFLATE = 1,
SERVER_NO_CONTEXT_TAKEOVER = 2, // remove this
CLIENT_NO_CONTEXT_TAKEOVER = 4, // remove this
NO_DELAY = 8,
SLIDING_DEFLATE_WINDOW = 16
};
enum ExtensionTokens {
TOK_PERMESSAGE_DEFLATE = 1838,
TOK_SERVER_NO_CONTEXT_TAKEOVER = 2807,
TOK_CLIENT_NO_CONTEXT_TAKEOVER = 2783,
TOK_SERVER_MAX_WINDOW_BITS = 2372,
TOK_CLIENT_MAX_WINDOW_BITS = 2348
};
struct ExtensionsParser {
private:
int *lastInteger = nullptr;
public:
bool perMessageDeflate = false;
bool serverNoContextTakeover = false;
bool clientNoContextTakeover = false;
int serverMaxWindowBits = 0;
int clientMaxWindowBits = 0;
template <bool isServer>
struct ExtensionsNegotiator {
protected:
int options;
WebSocket.h:
namespace uWS {
template <bool SSL, bool isServer>
struct WebSocket : AsyncSocket<SSL> {
template <bool> friend struct TemplatedApp;
private:
typedef AsyncSocket<SSL> Super;
void *init(bool perMessageDeflate, bool slidingCompression, std::string &&backpressure) {
new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, slidingCompression, std::move(backpressure));
return this;
}
public:
/* Returns pointer to the per socket user data */
void *getUserData() {
WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);
/* We just have it overallocated by sizeof type */
return (webSocketData + 1);
}
/* See AsyncSocket */
using Super::getBufferedAmount;
using Super::getRemoteAddress;
/* Simple, immediate close of the socket. Emits close event */
using Super::close;
WebSocketProtocol.h:
namespace uWS {
enum OpCode : unsigned char {
TEXT = 1,
BINARY = 2,
CLOSE = 8,
PING = 9,
PONG = 10
};
enum {
CLIENT,
SERVER
};
// 24 bytes perfectly
template <bool isServer>
struct WebSocketState {
public:
static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2;
static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4;
static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10;
// 16 bytes
struct State {
unsigned int wantsHead : 1;
unsigned int spillLength : 4;
int opStack : 2; // -1, 0, 1
unsigned int lastFin : 1;
// 15 bytes
unsigned char spill[LONG_MESSAGE_HEADER - 1];
OpCode opCode[2];
State() {
wantsHead = true;
spillLength = 0;
opStack = -1;
lastFin = true;
}
} state;
// 8 bytes
unsigned int remainingBytes = 0;
char mask[isServer ? 4 : 1];
};
namespace protocol {
struct CloseFrame {
uint16_t code;
char *message;
size_t length;
};
}
// essentially this is only a parser
template <const bool isServer, typename Impl>
struct WIN32_EXPORT WebSocketProtocol {
public:
static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2;
static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4;
static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10;
static const int CONSUME_POST_PADDING = 4;
static const int CONSUME_PRE_PADDING = LONG_MESSAGE_HEADER - 1;
};
}