现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

uWebSockets && uSockets 代码分析(2)主要数据结构

2020-01-02 17:30 工业·编程 ⁄ 共 18054字 ⁄ 字号 暂无评论

uSockets主要数据结构:

struct us_poll_t {

    uv_poll_t uv_p;

    LIBUS_SOCKET_DESCRIPTOR fd;//这个其实就是把平台摸平了的fd

    unsigned char poll_type;

};

 

/* Internal callback types are polls just like sockets */

struct us_internal_callback_t {

    alignas(LIBUS_EXT_ALIGNMENT) struct us_poll_t p;

    struct us_loop_t *loop;

    int cb_expects_the_loop;

    void (*cb)(struct us_internal_callback_t *cb);

};

 

struct us_timer_t *us_create_timer(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {

    struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + sizeof(uv_timer_t) + ext_size);

 

    cb->loop = loop;

    cb->cb_expects_the_loop = 0;

 

    uv_timer_t *uv_timer = (uv_timer_t *) (cb + 1);

    uv_timer_init(loop->uv_loop, uv_timer);

    uv_timer->data = cb;

 

    if (fallthrough) {

        uv_unref((uv_handle_t *) uv_timer);

    }

 

    return (struct us_timer_t *) cb;

}

 

// async (internal only)

struct us_internal_async *us_internal_create_async(struct us_loop_t *loop, int fallthrough, unsigned int ext_size) {

    struct us_internal_callback_t *cb = malloc(sizeof(struct us_internal_callback_t) + sizeof(uv_async_t) + ext_size);

 

    cb->loop = loop;

    return (struct us_internal_async *) cb;

}

 

struct us_internal_loop_data_t {

    struct us_timer_t *sweep_timer;// us_timer_t 是 Opaque pointer(不透明指针)无实现

    struct us_internal_async *wakeup_async;//Opaque pointer 无实现

    int last_write_failed;

    struct us_socket_context_t *head;

    struct us_socket_context_t *iterator;

    char *recv_buf;

    void *ssl_data;

    void (*pre_cb)(struct us_loop_t *);

    void (*post_cb)(struct us_loop_t *);

    struct us_socket_t *closed_head;

    /* We do not care if this flips or not, it doesn't matter */

    long long iteration_nr;

};

 

struct us_loop_t {

    alignas(LIBUS_EXT_ALIGNMENT) struct us_internal_loop_data_t data;

 

    uv_loop_t *uv_loop;

    int is_default;

 

    uv_prepare_t *uv_pre;

    uv_check_t *uv_check;

};

 

//这个好像没怎么用到,只在context.c中传参中用到

struct us_socket_context_options_t {

    const char *key_file_name;

    const char *cert_file_name;

    const char *passphrase;

    const char *dh_params_file_name;

    const char *ca_file_name;

    int ssl_prefer_low_memory_usage;

};

 

struct us_socket_t {

    alignas(LIBUS_EXT_ALIGNMENT) struct us_poll_t p;

    struct us_socket_context_t *context;

    struct us_socket_t *prev, *next;//这里隐藏了一个链表

    unsigned short timeout;

};

 

struct us_socket_context_t {

    alignas(LIBUS_EXT_ALIGNMENT) struct us_loop_t *loop;

    //unsigned short timeout;

    struct us_socket_t *head;

    struct us_socket_t *iterator;

    struct us_socket_context_t *prev, *next;//这里隐藏了一个链表

 

    // 重要callbacks

    struct us_socket_t *(*on_open)(struct us_socket_t *, int is_client, char *ip, int ip_length);

    struct us_socket_t *(*on_data)(struct us_socket_t *, char *data, int length);

    struct us_socket_t *(*on_writable)(struct us_socket_t *);

    struct us_socket_t *(*on_close)(struct us_socket_t *);

    //void (*on_timeout)(struct us_socket_context *);

    struct us_socket_t *(*on_socket_timeout)(struct us_socket_t *);

    struct us_socket_t *(*on_end)(struct us_socket_t *);

    int (*ignore_data)(struct us_socket_t *);

};

uWebSockets 主要数据结构

App.h:

namespace uWS {

template <bool SSL>

struct TemplatedApp {

private:

    /* The app always owns at least one http context, but creates websocket contexts on demand */

    HttpContext<SSL> *httpContext;

    std::vector<WebSocketContext<SSL, true> *> webSocketContexts;

 

    struct WebSocketBehavior {

        CompressOptions compression = DISABLED;

        int maxPayloadLength = 16 * 1024;

        int idleTimeout = 120;

        int maxBackpressure = 1 * 1024 * 1204;

        fu2::unique_function<void(uWS::WebSocket<SSL, true> *, HttpRequest *)> open = nullptr;

        fu2::unique_function<void(uWS::WebSocket<SSL, true> *, std::string_view, uWS::OpCode)> message = nullptr;

        fu2::unique_function<void(uWS::WebSocket<SSL, true> *)> drain = nullptr;

        fu2::unique_function<void(uWS::WebSocket<SSL, true> *)> ping = nullptr;

        fu2::unique_function<void(uWS::WebSocket<SSL, true> *)> pong = nullptr;

        fu2::unique_function<void(uWS::WebSocket<SSL, true> *, int, std::string_view)> close = nullptr;

    };

};

 

typedef TemplatedApp<false> App;

typedef TemplatedApp<true> SSLApp;

 

}

AsyncSocket.h:

namespace uWS {

 

    template <bool, bool> struct WebSocketContext;

 

template <bool SSL>

struct AsyncSocket {

    template <bool> friend struct HttpContext;

    template <bool, bool> friend struct WebSocketContext;

    template <bool> friend struct WebSocketContextData;

    friend struct TopicTree;

protected:

    /* Get loop data for socket */

    LoopData *getLoopData() {

        return (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) this)));

    }

 

    /* Get socket extension */

    AsyncSocketData<SSL> *getAsyncSocketData() {

        return (AsyncSocketData<SSL> *) us_socket_ext(SSL, (us_socket_t *) this);

    }

};

 

}

AsycSocketData.h:

namespace uWS {

 

/* Depending on how we want AsyncSocket to function, this will need to change */

 

template <bool SSL>

struct AsyncSocketData {

    /* This will do for now */

    std::string buffer;

 

    /* Allow move constructing us */

    AsyncSocketData(std::string &&backpressure) : buffer(std::move(backpressure)) {

 

    }

 

    /* Or emppty */

    AsyncSocketData() = default;

};

 

}

HttpContext.h:

namespace uWS {

template<bool> struct HttpResponse;

 

template <bool SSL>

struct HttpContext {

    template<bool> friend struct TemplatedApp;

private:

    HttpContext() = delete;

 

    /* Maximum delay allowed until an HTTP connection is terminated due to outstanding request or rejected data (slow loris protection) */

    static const int HTTP_IDLE_TIMEOUT_S = 10;

 

    us_socket_context_t *getSocketContext() {

        return (us_socket_context_t *) this;

    }

 

    static us_socket_context_t *getSocketContext(us_socket_t *s) {

        return (us_socket_context_t *) us_socket_context(SSL, s);

    }

 

    HttpContextData<SSL> *getSocketContextData() {

        return (HttpContextData<SSL> *) us_socket_context_ext(SSL, getSocketContext());

    }

 

    static HttpContextData<SSL> *getSocketContextDataS(us_socket_t *s) {

        return (HttpContextData<SSL> *) us_socket_context_ext(SSL, getSocketContext(s));

    }

HttpContextData.h:

namespace uWS {

template<bool> struct HttpResponse;

struct HttpRequest;

 

template <bool SSL>

struct alignas(16) HttpContextData {

    template <bool> friend struct HttpContext;

    template <bool> friend struct HttpResponse;

private:

    std::vector<fu2::unique_function<void(HttpResponse<SSL> *, int)>> filterHandlers;

 

    struct RouterData {

        HttpResponse<SSL> *httpResponse;

        HttpRequest *httpRequest;

    };

 

    HttpRouter<RouterData> router;

    void *upgradedWebSocket = nullptr;

};

 

}

HttpParser.h:

namespace uWS {

 

/* We require at least this much post padding */

static const int MINIMUM_HTTP_POST_PADDING = 32;

 

struct HttpRequest {

 

    friend struct HttpParser;

 

private:

    const static int MAX_HEADERS = 50;

    struct Header {

        std::string_view key, value;

    } headers[MAX_HEADERS];

    int querySeparator;

    bool didYield;

 

    std::pair<int, std::string_view *> currentParameters;

};

 

struct HttpParser {

 

private:

    std::string fallback;

    unsigned int remainingStreamingBytes = 0;

 

    const size_t MAX_FALLBACK_SIZE = 1024 * 4;

HttpResponseData.h:

namespace uWS {

 

template <bool SSL>

struct HttpResponseData : AsyncSocketData<SSL>, HttpParser {

    template <bool> friend struct HttpResponse;

    template <bool> friend struct HttpContext;

private:

    /* Bits of status */

    enum {

        HTTP_STATUS_CALLED = 1, // used

        HTTP_WRITE_CALLED = 2, // used

        HTTP_END_CALLED = 4, // used

        HTTP_RESPONSE_PENDING = 8, // used

        HTTP_ENDED_STREAM_OUT = 16 // not used

    };

 

    /* Per socket event handlers */

    fu2::unique_function<bool(int)> onWritable;

    fu2::unique_function<void()> onAborted;

    fu2::unique_function<void(std::string_view, bool)> inStream; // onData

    /* Outgoing offset */

    int offset = 0;

 

    /* Current state (content-length sent, status sent, write called, etc */

    int state = 0;

};

 

}

HttpResponse.h:

namespace uWS {

 

/* Some pre-defined status constants to use with writeStatus */

static const char *HTTP_200_OK = "200 OK";

 

/* The general timeout for HTTP sockets */

static const int HTTP_TIMEOUT_S = 10;

 

template <bool SSL>

struct HttpResponse : public AsyncSocket<SSL> {

    /* Solely used for getHttpResponseData() */

    template <bool> friend struct TemplatedApp;

    typedef AsyncSocket<SSL> Super;

private:

    HttpResponseData<SSL> *getHttpResponseData() {

        return (HttpResponseData<SSL> *) Super::getAsyncSocketData();

    }

    /* Immediately terminate this Http response */

    using Super::close;

 

    using Super::getRemoteAddress;

HttpRouter.h:

namespace uWS {

 

template <typename USERDATA>

struct HttpRouter {

private:

    static const unsigned int MAX_URL_SEGMENTS = 100;

 

    /* Basically a pre-allocated stack */

    struct RouteParameters {

        friend struct HttpRouter;

    private:

        std::string_view params[MAX_URL_SEGMENTS];

        int paramsTop;

 

        void reset() {

            paramsTop = -1;

        }

 

        void push(std::string_view param) {

            /* We check these bounds indirectly via the urlSegments limit */

            params[++paramsTop] = param;

        }

 

        void pop() {

            /* Same here, we cannot pop outside */

            paramsTop--;

        }

    } routeParameters;

 

    std::vector<fu2::unique_function<bool(USERDATA &, std::pair<int, std::string_view *>)>> handlers;

    HttpRouter(const HttpRouter &other) = delete;

 

    struct Node {

        std::string name;

        std::vector<Node *> children;

        short handler = 0; // unhandled

    } tree;

 

    std::string_view currentUrl;

    std::string_view urlSegmentVector[MAX_URL_SEGMENTS];

    int urlSegmentTop;

LoopData.h:

namespace uWS {

 

struct Loop;

 

struct alignas(16) LoopData {

    friend struct Loop;

private:

    std::mutex deferMutex;

    int currentDeferQueue = 0;

    std::vector<fu2::unique_function<void()>> deferQueues[2];

 

    /* Map from void ptr to handler */

    std::map<void *, fu2::unique_function<void(Loop *)>> postHandlers, preHandlers;

 

public:

    ~LoopData() {

        /* If we have had App.ws called with compression we need to clear this */

        if (zlibContext) {

            delete zlibContext;

            delete inflationStream;

            delete deflationStream;

        }

        delete [] corkBuffer;

    }

 

    /* Good 16k for SSL perf. */

    static const int CORK_BUFFER_SIZE = 16 * 1024;

 

    /* Cork data */

    char *corkBuffer = new char[CORK_BUFFER_SIZE];

    int corkOffset = 0;

    void *corkedSocket = nullptr;

 

    /* Per message deflate data */

    ZlibContext *zlibContext = nullptr;

    InflationStream *inflationStream = nullptr;

    DeflationStream *deflationStream = nullptr;

};

 

}

Loop.h:

namespace uWS {

struct Loop {

private:

    Loop *init() {

        new (us_loop_ext((us_loop_t *) this)) LoopData;

        return this;

    }

 

    static Loop *create(void *hint) {

        return ((Loop *) us_create_loop(hint, wakeupCb, preCb, postCb, sizeof(LoopData)))->init();

    }

public:

    /* Lazily initializes a per-thread loop and returns it.

     * Will automatically free all initialized loops at exit. */

    static Loop *get(void *existingNativeLoop = nullptr) {

        static thread_local LoopCleaner lazyLoop;

        if (!lazyLoop.loop) {

            /* If we are given a native loop pointer we pass that to uSockets and let it deal with it */

            if (existingNativeLoop) {

                /* Todo: here we want to pass the pointer, not a boolean */

                lazyLoop.loop = create(existingNativeLoop);

                /* We cannot register automatic free here, must be manually done */

            } else {

                lazyLoop.loop = create(nullptr);

                lazyLoop.cleanMe = true;

            }

        }

 

        return lazyLoop.loop;

    }

PerMessageDeflate.h:

namespace uWS {

 

/* Do not compile this module if we don't want it */

#ifdef UWS_NO_ZLIB

struct ZlibContext {};

struct InflationStream {

    std::string_view inflate(ZlibContext *zlibContext, std::string_view compressed, size_t maxPayloadLength) {

        return compressed;

    }

};

struct DeflationStream {

    std::string_view deflate(ZlibContext *zlibContext, std::string_view raw, bool reset) {

        return raw;

    }

};

#else

 

#define LARGE_BUFFER_SIZE 1024 * 16 // todo: fix this

 

struct ZlibContext {

    /* Any returned data is valid until next same-class call.

     * We need to have two classes to allow inflation followed

     * by many deflations without modifying the inflation */

    std::string dynamicDeflationBuffer;

    std::string dynamicInflationBuffer;

    char *deflationBuffer;

    char *inflationBuffer;

 

    ZlibContext() {

        deflationBuffer = (char *) malloc(LARGE_BUFFER_SIZE);

        inflationBuffer = (char *) malloc(LARGE_BUFFER_SIZE);

    }

 

struct DeflationStream {

    z_stream deflationStream = {};

 

    DeflationStream() {

        deflateInit2(&deflationStream, 1, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);

    }

 

struct InflationStream {

    z_stream inflationStream = {};

 

    InflationStream() {

        inflateInit2(&inflationStream, -15);

    }

 

    ~InflationStream() {

        inflateEnd(&inflationStream);

    }

TopicTreeDraft.h:

namespace uWS {

 

/* A Subscriber is an extension of a socket */

struct Subscriber {

    std::list<struct Topic *> subscriptions;

    void *user;

 

    Subscriber(void *user) : user(user) {}

};

 

struct Topic {

    /* Memory for our name */

    char *name;

    size_t length;

 

    /* Our parent or nullptr */

    Topic *parent = nullptr;

 

    /* Next triggered Topic */

    bool triggered = false;

 

    /* Exact string matches */

    std::map<std::string_view, Topic *> children;

 

    /* Wildcard child */

    Topic *wildcardChild = nullptr;

 

    /* Terminating wildcard child */

    Topic *terminatingWildcardChild = nullptr;

 

    /* What we published */

    std::map<unsigned int, std::string> messages;

 

    std::set<Subscriber *> subs;

};

 

struct TopicTree {

private:

    std::function<int(Subscriber *, std::string_view)> cb;

 

    Topic *root = new Topic;

 

    /* Global messageId for deduplication of overlapping topics and ordering between topics */

    unsigned int messageId = 0;

 

    /* The triggered topics */

    Topic *triggeredTopics[64];

    int numTriggeredTopics = 0;

    Subscriber *min = (Subscriber *) UINTPTR_MAX;

WebSocketContextData.h:

namespace uWS {

 

template <bool, bool> struct WebSocket;

 

/* todo: this looks identical to WebSocketBehavior, why not just std::move that entire thing in? */

 

template <bool SSL>

struct WebSocketContextData {

    /* The callbacks for this context */

    fu2::unique_function<void(WebSocket<SSL, true> *, std::string_view, uWS::OpCode)> messageHandler = nullptr;

    fu2::unique_function<void(WebSocket<SSL, true> *)> drainHandler = nullptr;

    fu2::unique_function<void(WebSocket<SSL, true> *, int, std::string_view)> closeHandler = nullptr;

 

    /* Settings for this context */

    size_t maxPayloadLength = 0;

    int idleTimeout = 0;

 

    /* There needs to be a maxBackpressure which will force close everything over that limit */

    size_t maxBackpressure = 0;

 

    /* Each websocket context has a topic tree for pub/sub */

    TopicTree topicTree;

WebSocketContext.h:

namespace uWS {

 

template <bool SSL, bool isServer>

struct WebSocketContext {

    template <bool> friend struct TemplatedApp;

    template <bool, typename> friend struct WebSocketProtocol;

private:

    WebSocketContext() = delete;

 

    us_socket_context_t *getSocketContext() {

        return (us_socket_context_t *) this;

    }

 

    WebSocketContextData<SSL> *getExt() {

        return (WebSocketContextData<SSL> *) us_socket_context_ext(SSL, (us_socket_context_t *) this);

    }

 

public:

    /* WebSocket contexts are always child contexts to a HTTP context so no SSL options are needed as they are inherited */

    static WebSocketContext *create(Loop *loop, us_socket_context_t *parentSocketContext) {

        WebSocketContext *webSocketContext = (WebSocketContext *) us_create_child_socket_context(SSL, parentSocketContext, sizeof(WebSocketContextData<SSL>));

        if (!webSocketContext) {

            return nullptr;

        }

 

        /* Init socket context data */

        new ((WebSocketContextData<SSL> *) us_socket_context_ext(SSL, (us_socket_context_t *)webSocketContext)) WebSocketContextData<SSL>;

        return webSocketContext->init();

    }

WebSocketData.h:

namespace uWS {

 

struct WebSocketData : AsyncSocketData<false>, WebSocketState<true> {

    template <bool, bool> friend struct WebSocketContext;

    template <bool, bool> friend struct WebSocket;

private:

    std::string fragmentBuffer;

    int controlTipLength = 0;

    bool isShuttingDown = 0;

    enum CompressionStatus : char {

        DISABLED,

        ENABLED,

        COMPRESSED_FRAME

    } compressionStatus;

   

    /* We might have a dedicated compressor */

    DeflationStream *deflationStream = nullptr;

   

    /* We could be a subscriber */

    Subscriber *subscriber = nullptr;

WebSocketExtensions.h:

namespace uWS {

 

enum Options : unsigned int {

    NO_OPTIONS = 0,

    PERMESSAGE_DEFLATE = 1,

    SERVER_NO_CONTEXT_TAKEOVER = 2, // remove this

    CLIENT_NO_CONTEXT_TAKEOVER = 4, // remove this

    NO_DELAY = 8,

    SLIDING_DEFLATE_WINDOW = 16

};

 

enum ExtensionTokens {

    TOK_PERMESSAGE_DEFLATE = 1838,

    TOK_SERVER_NO_CONTEXT_TAKEOVER = 2807,

    TOK_CLIENT_NO_CONTEXT_TAKEOVER = 2783,

    TOK_SERVER_MAX_WINDOW_BITS = 2372,

    TOK_CLIENT_MAX_WINDOW_BITS = 2348

};

 

struct ExtensionsParser {

private:

    int *lastInteger = nullptr;

 

public:

    bool perMessageDeflate = false;

    bool serverNoContextTakeover = false;

    bool clientNoContextTakeover = false;

    int serverMaxWindowBits = 0;

    int clientMaxWindowBits = 0;

 

template <bool isServer>

struct ExtensionsNegotiator {

protected:

    int options;

 

WebSocket.h:

namespace uWS {

 

template <bool SSL, bool isServer>

struct WebSocket : AsyncSocket<SSL> {

    template <bool> friend struct TemplatedApp;

private:

    typedef AsyncSocket<SSL> Super;

 

    void *init(bool perMessageDeflate, bool slidingCompression, std::string &&backpressure) {

        new (us_socket_ext(SSL, (us_socket_t *) this)) WebSocketData(perMessageDeflate, slidingCompression, std::move(backpressure));

        return this;

    }

public:

 

    /* Returns pointer to the per socket user data */

    void *getUserData() {

        WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) this);

        /* We just have it overallocated by sizeof type */

        return (webSocketData + 1);

    }

 

    /* See AsyncSocket */

    using Super::getBufferedAmount;

    using Super::getRemoteAddress;

 

    /* Simple, immediate close of the socket. Emits close event */

    using Super::close;

WebSocketProtocol.h:

namespace uWS {

 

enum OpCode : unsigned char {

    TEXT = 1,

    BINARY = 2,

    CLOSE = 8,

    PING = 9,

    PONG = 10

};

 

enum {

    CLIENT,

    SERVER

};

 

// 24 bytes perfectly

template <bool isServer>

struct WebSocketState {

public:

    static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2;

    static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4;

    static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10;

 

    // 16 bytes

    struct State {

        unsigned int wantsHead : 1;

        unsigned int spillLength : 4;

        int opStack : 2; // -1, 0, 1

        unsigned int lastFin : 1;

 

        // 15 bytes

        unsigned char spill[LONG_MESSAGE_HEADER - 1];

        OpCode opCode[2];

 

        State() {

            wantsHead = true;

            spillLength = 0;

            opStack = -1;

            lastFin = true;

        }

 

    } state;

 

    // 8 bytes

    unsigned int remainingBytes = 0;

    char mask[isServer ? 4 : 1];

};

 

namespace protocol {

struct CloseFrame {

    uint16_t code;

    char *message;

    size_t length;

};

 

}

 

// essentially this is only a parser

template <const bool isServer, typename Impl>

struct WIN32_EXPORT WebSocketProtocol {

public:

    static const unsigned int SHORT_MESSAGE_HEADER = isServer ? 6 : 2;

    static const unsigned int MEDIUM_MESSAGE_HEADER = isServer ? 8 : 4;

    static const unsigned int LONG_MESSAGE_HEADER = isServer ? 14 : 10;

 

    static const int CONSUME_POST_PADDING = 4;

    static const int CONSUME_PRE_PADDING = LONG_MESSAGE_HEADER - 1;

};

 

}

给我留言

留言无头像?