From 60b48e3ee0b06a7e6591ff5e8d88c584c6ae68d4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sun, 16 Jan 2022 11:58:45 +0800 Subject: [PATCH] refactor code --- source/libs/transport/inc/rpcHead.h | 26 - source/libs/transport/inc/rpcTcp.h | 4 - source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/rpcCache.c | 4 - source/libs/transport/src/rpcMain.c | 439 +------------- source/libs/transport/src/rpcTcp.c | 8 - source/libs/transport/src/rpcUdp.c | 4 - source/libs/transport/src/transport.c | 697 +++++++++++++++++++++++ 8 files changed, 727 insertions(+), 456 deletions(-) diff --git a/source/libs/transport/inc/rpcHead.h b/source/libs/transport/inc/rpcHead.h index 66821db133..5ddf1a83c9 100644 --- a/source/libs/transport/inc/rpcHead.h +++ b/source/libs/transport/inc/rpcHead.h @@ -21,31 +21,6 @@ extern "C" { #endif -#ifdef USE_UV -typedef struct { - char version : 4; // RPC version - char comp : 4; // compression algorithm, 0:no compression 1:lz4 - char resflag : 2; // reserved bits - char spi : 3; // security parameter index - char encrypt : 3; // encrypt algorithm, 0: no encryption - uint16_t tranId; // transcation ID - uint32_t linkUid; // for unique connection ID assigned by client - uint64_t ahandle; // ahandle assigned by client - uint32_t sourceId; // source ID, an index for connection list - uint32_t destId; // destination ID, an index for connection list - uint32_t destIp; // destination IP address, for NAT scenario - char user[TSDB_UNI_LEN]; // user ID - uint16_t port; // for UDP only, port may be changed - char empty[1]; // reserved - uint16_t msgType; // message type - int32_t msgLen; // message length including the header iteslf - uint32_t msgVer; - int32_t code; // code in response message - uint8_t content[0]; // message body starts from here -} SRpcHead; - -#else - #define RPC_CONN_TCP 2 extern int tsRpcOverhead; @@ -96,7 +71,6 @@ typedef struct { } SRpcDigest; #pragma pack(pop) -#endif #ifdef __cplusplus } diff --git a/source/libs/transport/inc/rpcTcp.h b/source/libs/transport/inc/rpcTcp.h index 5e5c43a1db..ad42307516 100644 --- a/source/libs/transport/inc/rpcTcp.h +++ b/source/libs/transport/inc/rpcTcp.h @@ -21,8 +21,6 @@ extern "C" { #endif -#ifdef USE_UV -#else void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThreads, void *fp, void *shandle); void taosStopTcpServer(void *param); void taosCleanUpTcpServer(void *param); @@ -35,8 +33,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); -#endif - #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 067b371b84..f93753cfe9 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,6 +16,7 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ +#include "rpcHead.h" #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/transport/src/rpcCache.c b/source/libs/transport/src/rpcCache.c index 40767d2ba5..1db2808126 100644 --- a/source/libs/transport/src/rpcCache.c +++ b/source/libs/transport/src/rpcCache.c @@ -22,9 +22,6 @@ #include "ttimer.h" #include "tutil.h" -#ifdef USE_UV - -#else typedef struct SConnHash { char fqdn[TSDB_FQDN_LEN]; uint16_t port; @@ -295,4 +292,3 @@ static void rpcUnlockCache(int64_t *lockedBy) { assert(false); } } -#endif diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 37ef10ba5b..f381768a34 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -13,9 +13,6 @@ * along with this program. If not, see . */ -#ifdef USE_UV -#include -#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -36,6 +33,17 @@ #include "ttimer.h" #include "tutil.h" +static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT; + +int tsRpcMaxUdpSize = 15000; // bytes +int tsProgressTimer = 100; +// not configurable +int tsRpcMaxRetry; +int tsRpcHeadSize; +int tsRpcOverhead; + +#ifndef USE_UV + typedef struct { int sessions; // number of sessions allowed int numOfThreads; // number of threads to process incoming messages @@ -51,28 +59,28 @@ typedef struct { char secret[TSDB_PASSWORD_LEN]; // secret for the link char ckey[TSDB_PASSWORD_LEN]; // ciphering key - void (*cfp)(void* parent, SRpcMsg*, SEpSet*); - int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + void (*cfp)(void *parent, SRpcMsg *, SEpSet *); + int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t refCount; - void* parent; - void* idPool; // handle to ID pool - void* tmrCtrl; // handle to timer - SHashObj* hash; // handle returned by hash utility - void* tcphandle; // returned handle from TCP initialization - void* udphandle; // returned handle from UDP initialization - void* pCache; // connection cache + void * parent; + void * idPool; // handle to ID pool + void * tmrCtrl; // handle to timer + SHashObj * hash; // handle returned by hash utility + void * tcphandle; // returned handle from TCP initialization + void * udphandle; // returned handle from UDP initialization + void * pCache; // connection cache pthread_mutex_t mutex; - struct SRpcConn* connList; // connection list + struct SRpcConn *connList; // connection list } SRpcInfo; typedef struct { - SRpcInfo* pRpc; // associated SRpcInfo + SRpcInfo * pRpc; // associated SRpcInfo SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - struct SRpcConn* pConn; // pConn allocated + void * ahandle; // handle provided by app + struct SRpcConn *pConn; // pConn allocated tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app + uint8_t * pCont; // content provided by app int32_t contLen; // content length int32_t code; // error code int16_t numOfTry; // number of try for different servers @@ -80,394 +88,14 @@ typedef struct { int8_t redirect; // flag to indicate redirect int8_t connType; // connection type int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API - SEpSet* pSet; // for synchronous API + SRpcMsg * pRsp; // for synchronous API + tsem_t * pSem; // for synchronous API + SEpSet * pSet; // for synchronous API char msg[0]; // RpcHead starts from here } SRpcReqContext; -#ifdef USE_UV - -#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) - -#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) -static const char* notify = "a"; - -typedef struct SThreadObj { - pthread_t thread; - uv_pipe_t* pipe; - uv_loop_t* loop; - uv_async_t* workerAsync; // - int fd; - queue conn; - pthread_mutex_t connMtx; -} SThreadObj; - -typedef struct SServerObj { - pthread_t thread; - uv_tcp_t server; - uv_loop_t* loop; - int workerIdx; - int numOfThread; - SThreadObj** pThreadObj; - uv_pipe_t** pipe; - uint32_t ip; - uint32_t port; -} SServerObj; - -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - -typedef struct SConnCtx { - uv_tcp_t* pTcp; - uv_write_t* pWriter; - uv_timer_t* pTimer; - - uv_async_t* pWorkerAsync; - queue queue; - int ref; - int persist; // persist connection or not - SConnBuffer connBuf; - int count; -} SConnCtx; - -static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void uvOnTimeoutCb(uv_timer_t* handle); -static void uvOnWriteCb(uv_write_t* req, int status); -static void uvOnAcceptCb(uv_stream_t* stream, int status); -static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); -static void uvWorkerAsyncCb(uv_async_t* handle); - -static SConnCtx* connCtxCreate(); -static void connCtxDestroy(SConnCtx* ctx); -static void uvConnCtxDestroy(uv_handle_t* handle); - -static void* workerThread(void* arg); -static void* acceptThread(void* arg); - -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); - -int32_t rpcInit() { return -1; } -void rpcCleanup() { return; }; - -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - // opte -} -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SServerObj* srv = calloc(1, sizeof(SServerObj)); - srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThread = numOfThreads; - srv->workerIdx = 0; - srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); - srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); - srv->ip = ip; - srv->port = port; - uv_loop_init(srv->loop); - - for (int i = 0; i < srv->numOfThread; i++) { - SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); - int fds[2]; - if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - return NULL; - } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - thrd->fd = fds[0]; - thrd->pipe = &(srv->pipe[i][1]); // init read - int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); - if (err == 0) { - tDebug("sucess to create worker-thread %d", i); - // printf("thread %d create\n", i); - } else { - // TODO: clear all other resource later - tError("failed to create worker-thread %d", i); - } - srv->pThreadObj[i] = thrd; - } - - int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); - if (err == 0) { - tDebug("success to create accept-thread"); - } else { - // clear all resource later - } - - return srv; -} -void* rpcOpen(const SRpcInit* pInit) { - SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) { - return NULL; - } - if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); - } - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); - return pRpc; -} - -void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - static const int CAPACITY = 1024; - /* - * formate of data buffer: - * |<-------SRpcReqContext------->|<------------data read from socket----------->| - */ - - SConnCtx* ctx = handle->data; - SConnBuffer* pBuf = &ctx->connBuf; - if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); - pBuf->len = 0; - pBuf->cap = CAPACITY; - pBuf->left = -1; - - buf->base = pBuf->buf + RPC_RESERVE_SIZE; - buf->len = CAPACITY; - } else { - if (pBuf->len >= pBuf->cap) { - if (pBuf->left == -1) { - pBuf->cap *= 2; - pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); - } else if (pBuf->len + pBuf->left > pBuf->cap) { - pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); - } - } - buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; - buf->len = pBuf->cap - pBuf->len; - } -} -// check data read from socket completely or not -// -static bool isReadAll(SConnBuffer* data) { - // TODO(yihao): handle pipeline later - SRpcHead rpcHead; - int32_t headLen = sizeof(rpcHead); - if (data->len >= headLen) { - memcpy((char*)&rpcHead, data->buf, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); - if (msgLen > data->len) { - data->left = msgLen - data->len; - return false; - } else { - return true; - } - } else { - return false; - } -} - -void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SConnCtx* ctx = cli->data; - SConnBuffer* pBuf = &ctx->connBuf; - if (nread > 0) { - pBuf->len += nread; - if (isReadAll(pBuf)) { - tDebug("alread read complete packet"); - } else { - tDebug("read half packet, continue to read"); - } - return; - } - - if (nread != UV_EOF) { - tDebug("Read error %s\n", uv_err_name(nread)); - } - uv_close((uv_handle_t*)cli, uvConnCtxDestroy); -} -void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(sizeof(char)); - buf->len = 2; -} - -void uvOnTimeoutCb(uv_timer_t* handle) { - // opt - tDebug("time out"); -} - -void uvOnWriteCb(uv_write_t* req, int status) { - SConnCtx* ctx = req->data; - if (status == 0) { - tDebug("data already was written on stream"); - } else { - connCtxDestroy(ctx); - } - // opt -} - -void uvWorkerAsyncCb(uv_async_t* handle) { - SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); - SConnCtx* conn = NULL; - - // opt later - pthread_mutex_lock(&pObj->connMtx); - if (!QUEUE_IS_EMPTY(&pObj->conn)) { - queue* head = QUEUE_HEAD(&pObj->conn); - conn = QUEUE_DATA(head, SConnCtx, queue); - QUEUE_REMOVE(&conn->queue); - } - pthread_mutex_unlock(&pObj->connMtx); - if (conn == NULL) { - tError("except occurred, do nothing"); - return; - } -} - -void uvOnAcceptCb(uv_stream_t* stream, int status) { - if (status == -1) { - return; - } - SServerObj* pObj = container_of(stream, SServerObj, server); - - uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pObj->loop, cli); - - if (uv_accept(stream, (uv_stream_t*)cli) == 0) { - uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); - - uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); - - pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; - tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); - uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); - } else { - uv_close((uv_handle_t*)cli, NULL); - } -} -void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { - tDebug("connection coming"); - if (nread < 0) { - if (nread != UV_EOF) { - tError("read error %s", uv_err_name(nread)); - } - // TODO(log other failure reason) - uv_close((uv_handle_t*)q, NULL); - return; - } - // free memory allocated by - assert(nread == strlen(notify)); - assert(buf->base[0] == notify[0]); - free(buf->base); - - SThreadObj* pObj = (SThreadObj*)container_of(q, struct SThreadObj, pipe); - - uv_pipe_t* pipe = (uv_pipe_t*)q; - if (!uv_pipe_pending_count(pipe)) { - tError("No pending count"); - return; - } - - uv_handle_type pending = uv_pipe_pending_type(pipe); - assert(pending == UV_TCP); - - SConnCtx* pConn = connCtxCreate(); - /* init conn timer*/ - pConn->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pObj->loop, pConn->pTimer); - - pConn->pWorkerAsync = pObj->workerAsync; // thread safty - - // init client handle - pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pObj->loop, pConn->pTcp); - pConn->pTcp->data = pConn; - - // init write request, just - pConn->pWriter = calloc(1, sizeof(uv_write_t)); - pConn->pWriter->data = pConn; - - if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { - uv_os_fd_t fd; - uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); - tDebug("new connection created: %d", fd); - uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); - } else { - connCtxDestroy(pConn); - } -} - -void* acceptThread(void* arg) { - // opt - SServerObj* srv = (SServerObj*)arg; - uv_tcp_init(srv->loop, &srv->server); - - struct sockaddr_in bind_addr; - - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); - return NULL; - } - uv_run(srv->loop, UV_RUN_DEFAULT); -} -void* workerThread(void* arg) { - SThreadObj* pObj = (SThreadObj*)arg; - - pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pObj->loop); - - uv_pipe_init(pObj->loop, pObj->pipe, 1); - uv_pipe_open(pObj->pipe, pObj->fd); - - QUEUE_INIT(&pObj->conn); - - pObj->workerAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb); - - uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb); - uv_run(pObj->loop, UV_RUN_DEFAULT); -} -static SConnCtx* connCtxCreate() { - SConnCtx* pConn = (SConnCtx*)calloc(1, sizeof(SConnCtx)); - return pConn; -} -static void connCtxDestroy(SConnCtx* ctx) { - if (ctx == NULL) { - return; - } - uv_timer_stop(ctx->pTimer); - free(ctx->pTimer); - uv_close((uv_handle_t*)ctx->pTcp, NULL); - free(ctx->pTcp); - free(ctx->pWriter); - free(ctx); - // handle -} -static void uvConnCtxDestroy(uv_handle_t* handle) { - SConnCtx* ctx = handle->data; - connCtxDestroy(ctx); -} -void rpcClose(void* arg) { return; } -void* rpcMallocCont(int contLen) { return NULL; } -void rpcFreeCont(void* cont) { return; } -void* rpcReallocCont(void* ptr, int contLen) { return NULL; } - -void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } - -void rpcSendResponse(const SRpcMsg* pMsg) {} - -void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } - -#else - #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) +#define rpcHeadFromCont(cont) ((SRpcHead *)((char *)cont - sizeof(SRpcHead))) #define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) #define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) #define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) @@ -510,15 +138,6 @@ typedef struct SRpcConn { SRpcReqContext *pContext; // request context } SRpcConn; -static pthread_once_t tsRpcInitOnce = PTHREAD_ONCE_INIT; - -int tsRpcMaxUdpSize = 15000; // bytes -int tsProgressTimer = 100; -// not configurable -int tsRpcMaxRetry; -int tsRpcHeadSize; -int tsRpcOverhead; - static int tsRpcRefId = -1; static int32_t tsRpcNum = 0; // static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT; diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index 9fa51a6fdc..81c464a661 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -14,9 +14,6 @@ */ #include "rpcTcp.h" -#ifdef USE_UV -#include -#endif #include "os.h" #include "rpcHead.h" #include "rpcLog.h" @@ -24,9 +21,6 @@ #include "taoserror.h" #include "tutil.h" -#ifdef USE_UV - -#else typedef struct SFdObj { void * signature; SOCKET fd; // TCP socket FD @@ -662,5 +656,3 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } - -#endif diff --git a/source/libs/transport/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c index 79956cc98d..b57cf57c55 100644 --- a/source/libs/transport/src/rpcUdp.c +++ b/source/libs/transport/src/rpcUdp.c @@ -22,9 +22,6 @@ #include "ttimer.h" #include "tutil.h" -#ifdef USE_UV -// no support upd currently -#else #define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_PKTS 1000 #define RPC_UDP_BUF_TIME 5 // mseconds @@ -260,4 +257,3 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c return ret; } -#endif diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transport.c index f2f48bbc8a..a6c9cee0b7 100644 --- a/source/libs/transport/src/transport.c +++ b/source/libs/transport/src/transport.c @@ -12,3 +12,700 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#ifdef USE_UV + +#include +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" +#include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmd5.h" +#include "tmempool.h" +#include "tmsg.h" +#include "transportInt.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) +static const char* notify = "a"; + +typedef struct { + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; + uint16_t localPort; + int8_t connType; + int index; // for UDP server only, round robin for multiple threads + char label[TSDB_LABEL_LEN]; + + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key + + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); + int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); + + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization + void* udphandle; // returned handle from UDP initialization + void* pCache; // connection cache + pthread_mutex_t mutex; + struct SRpcConn* connList; // connection list +} SRpcInfo; + +typedef struct { + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + SEpSet* pSet; // for synchronous API + char msg[0]; // RpcHead starts from here +} SRpcReqContext; + +typedef struct SThreadObj { + pthread_t thread; + uv_pipe_t* pipe; + int fd; + uv_loop_t* loop; + uv_async_t* workerAsync; // + queue conn; + pthread_mutex_t connMtx; + void* shandle; +} SThreadObj; + +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) +#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) +#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) +#define rpcIsReq(type) (type & 1U) + +typedef struct SServerObj { + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + int workerIdx; + int numOfThread; + SThreadObj** pThreadObj; + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; +} SServerObj; + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + +typedef struct SRpcConn { + uv_tcp_t* pTcp; + uv_write_t* pWriter; + uv_timer_t* pTimer; + + uv_async_t* pWorkerAsync; + queue queue; + int ref; + int persist; // persist connection or not + SConnBuffer connBuf; + int count; + void* shandle; // rpc init + void* ahandle; + + // del later + char secured; + int spi; + char info[64]; + char user[TSDB_UNI_LEN]; // user ID for the link + char secret[TSDB_PASSWORD_LEN]; + char ckey[TSDB_PASSWORD_LEN]; // ciphering key +} SRpcConn; + +// auth function +static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen); +// compress data +static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); + +static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void uvOnTimeoutCb(uv_timer_t* handle); +static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnAcceptCb(uv_stream_t* stream, int status); +static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void uvWorkerAsyncCb(uv_async_t* handle); + +static SRpcConn* connCreate(); +static void connDestroy(SRpcConn* conn); +static void uvConnDestroy(uv_handle_t* handle); + +static void* workerThread(void* arg); +static void* acceptThread(void* arg); + +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); + +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SServerObj* srv = calloc(1, sizeof(SServerObj)); + srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + srv->numOfThread = numOfThreads; + srv->workerIdx = 0; + srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThread, sizeof(SThreadObj*)); + srv->pipe = (uv_pipe_t**)calloc(srv->numOfThread, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; + uv_loop_init(srv->loop); + + for (int i = 0; i < srv->numOfThread; i++) { + SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + int fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + return NULL; + } + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + thrd->shandle = shandle; + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create worker-thread %d", i); + // printf("thread %d create\n", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread %d", i); + } + srv->pThreadObj[i] = thrd; + } + + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept-thread"); + } else { + // clear all resource later + } + + return srv; +} +void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + /* + * formate of data buffer: + * |<-------SRpcReqContext------->|<------------data read from socket----------->| + */ + static const int CAPACITY = 1024; + + SRpcConn* ctx = handle->data; + SConnBuffer* pBuf = &ctx->connBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; + + buf->base = pBuf->buf + RPC_RESERVE_SIZE; + buf->len = CAPACITY; + } else { + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); + } + } + buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; + buf->len = pBuf->cap - pBuf->len; + } +} +// check data read from socket completely or not +// +static bool isReadAll(SConnBuffer* data) { + // TODO(yihao): handle pipeline later + SRpcHead rpcHead; + int32_t headLen = sizeof(rpcHead); + if (data->len >= headLen) { + memcpy((char*)&rpcHead, data->buf, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; + return false; + } else { + return true; + } + } else { + return false; + } +} +static void uvDoProcess(SRecvInfo* pRecv) { + SRpcHead* pHead = (SRpcHead*)pRecv->msg; + SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; + SRpcConn* pConn = pRecv->thandle; + + tDump(pRecv->msg, pRecv->msgLen); + + terrno = 0; + SRpcReqContext* pContest; + + // do auth and check +} +static int uvAuthData(SRpcConn* pConn, char* msg, int len) { + SRpcHead* pHead = (SRpcHead*)msg; + int code = 0; + + if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { + // secured link, or no authentication + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, secured link, no auth is required", pConn->info); + return 0; + } + + if (!rpcIsReq(pHead->msgType)) { + // for response, if code is auth failure, it shall bypass the auth process + code = htonl(pHead->code); + if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || + code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); + return 0; + } + } + + code = 0; + if (pHead->spi == pConn->spi) { + // authentication + SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest)); + + int32_t delta; + delta = (int32_t)htonl(pDigest->timeStamp); + delta -= (int32_t)taosGetTimestampSec(); + if (abs(delta) > 900) { + tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); + code = TSDB_CODE_RPC_INVALID_TIME_STAMP; + } else { + if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + // tDebug("%s, authentication failed, msg discarded", pConn->info); + code = TSDB_CODE_RPC_AUTH_FAILURE; + } else { + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); + if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client + // tTrace("%s, message is authenticated", pConn->info); + } + } + } else { + tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); + code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; + } + + return code; +} +static void uvProcessData(SRpcConn* ctx) { + SRecvInfo info; + SRecvInfo* p = &info; + SConnBuffer* pBuf = &ctx->connBuf; + p->msg = pBuf->buf + RPC_RESERVE_SIZE; + p->msgLen = pBuf->len; + p->ip = 0; + p->port = 0; + p->shandle = ctx->shandle; // + p->thandle = ctx; + p->chandle = NULL; + + // + SRpcHead* pHead = (SRpcHead*)p->msg; + assert(rpcIsReq(pHead->msgType)); + + SRpcInfo* pRpc = (SRpcInfo*)p->shandle; + SRpcConn* pConn = (SRpcConn*)p->thandle; + + pConn->ahandle = (void*)pHead->ahandle; + pHead->code = htonl(pHead->code); + + SRpcMsg rpcMsg; + + pHead = rpcDecompressRpcMsg(pHead); + rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); + rpcMsg.pCont = pHead->content; + rpcMsg.msgType = pHead->msgType; + rpcMsg.code = pHead->code; + rpcMsg.ahandle = pConn->ahandle; + rpcMsg.handle = pConn; + (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); + // auth + // validate msg type +} +void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // opt + SRpcConn* ctx = cli->data; + SConnBuffer* pBuf = &ctx->connBuf; + if (nread > 0) { + pBuf->len += nread; + if (isReadAll(pBuf)) { + tDebug("alread read complete packet"); + uvProcessData(ctx); + } else { + tDebug("read half packet, continue to read"); + } + return; + } + + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } + uv_close((uv_handle_t*)cli, uvConnDestroy); +} +void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(sizeof(char)); + buf->len = 2; +} + +void uvOnTimeoutCb(uv_timer_t* handle) { + // opt + tDebug("time out"); +} + +void uvOnWriteCb(uv_write_t* req, int status) { + SRpcConn* conn = req->data; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + connDestroy(conn); + } + // opt +} + +void uvWorkerAsyncCb(uv_async_t* handle) { + SThreadObj* pObj = container_of(handle, SThreadObj, workerAsync); + SRpcConn* conn = NULL; + + // opt later + pthread_mutex_lock(&pObj->connMtx); + if (!QUEUE_IS_EMPTY(&pObj->conn)) { + queue* head = QUEUE_HEAD(&pObj->conn); + conn = QUEUE_DATA(head, SRpcConn, queue); + QUEUE_REMOVE(&conn->queue); + } + pthread_mutex_unlock(&pObj->connMtx); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } +} + +void uvOnAcceptCb(uv_stream_t* stream, int status) { + if (status == -1) { + return; + } + SServerObj* pObj = container_of(stream, SServerObj, server); + + uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, cli); + + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { + uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); + + uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); + + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThread; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); + } else { + uv_close((uv_handle_t*)cli, NULL); + } +} +void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + tDebug("connection coming"); + if (nread < 0) { + if (nread != UV_EOF) { + tError("read error %s", uv_err_name(nread)); + } + // TODO(log other failure reason) + uv_close((uv_handle_t*)q, NULL); + return; + } + // free memory allocated by + assert(nread == strlen(notify)); + assert(buf->base[0] == notify[0]); + free(buf->base); + + SThreadObj* pObj = q->data; + + uv_pipe_t* pipe = (uv_pipe_t*)q; + if (!uv_pipe_pending_count(pipe)) { + tError("No pending count"); + return; + } + + uv_handle_type pending = uv_pipe_pending_type(pipe); + assert(pending == UV_TCP); + + SRpcConn* pConn = connCreate(); + pConn->shandle = pObj->shandle; + /* init conn timer*/ + pConn->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pObj->loop, pConn->pTimer); + + pConn->pWorkerAsync = pObj->workerAsync; // thread safty + + // init client handle + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, pConn->pTcp); + pConn->pTcp->data = pConn; + + // init write request, just + pConn->pWriter = calloc(1, sizeof(uv_write_t)); + pConn->pWriter->data = pConn; + + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { + uv_os_fd_t fd; + uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); + tDebug("new connection created: %d", fd); + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + } else { + connDestroy(pConn); + } +} + +void* acceptThread(void* arg) { + // opt + SServerObj* srv = (SServerObj*)arg; + uv_tcp_init(srv->loop, &srv->server); + + struct sockaddr_in bind_addr; + + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); +} +void* workerThread(void* arg) { + SThreadObj* pObj = (SThreadObj*)arg; + + pObj->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pObj->loop); + + uv_pipe_init(pObj->loop, pObj->pipe, 1); + uv_pipe_open(pObj->pipe, pObj->fd); + + pObj->pipe->data = pObj; + + QUEUE_INIT(&pObj->conn); + + pObj->workerAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pObj->loop, pObj->workerAsync, uvWorkerAsyncCb); + + uv_read_start((uv_stream_t*)pObj->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + uv_run(pObj->loop, UV_RUN_DEFAULT); +} +static SRpcConn* connCreate() { + SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn)); + return pConn; +} +static void connDestroy(SRpcConn* conn) { + if (conn == NULL) { + return; + } + uv_timer_stop(conn->pTimer); + free(conn->pTimer); + uv_close((uv_handle_t*)conn->pTcp, NULL); + free(conn->pTcp); + free(conn->pWriter); + free(conn); + // handle +} +static void uvConnDestroy(uv_handle_t* handle) { + SRpcConn* conn = handle->data; + connDestroy(conn); +} +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { + return NULL; + } + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->tcphandle = taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + return pRpc; +} +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { return NULL; } +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { return; } + +void rpcSendResponse(const SRpcMsg* pMsg) {} + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } + +static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + int ret = -1; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; + + return ret; +} +static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + memcpy(pAuth, context.digest, sizeof(context.digest)); +} + +static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) { + SRpcHead* pHead = (SRpcHead*)msg; + + if (pConn->spi && pConn->secured == 0) { + // add auth part + pHead->spi = pConn->spi; + SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); + pDigest->timeStamp = htonl(taosGetTimestampSec()); + msgLen += sizeof(SRpcDigest); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + } else { + pHead->spi = 0; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + + return msgLen; +} + +static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { + SRpcHead* pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char* buf = malloc(contLen + overhead + 8); // 8 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen > 0 && compLen < contLen - overhead) { + SRpcComp* pComp = (SRpcComp*)pCont; + pComp->reserved = 0; + pComp->contLen = htonl(contLen); + memcpy(pCont + overhead, buf, compLen); + + pHead->comp = 1; + tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); + finalLen = compLen + overhead; + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { + int overhead = sizeof(SRpcComp); + SRpcHead* pNewHead = NULL; + uint8_t* pCont = pHead->content; + SRpcComp* pComp = (SRpcComp*)pHead->content; + + if (pHead->comp) { + // decompress the content + assert(pComp->reserved == 0); + int contLen = htonl(pComp->contLen); + + // prepare the temporary buffer to decompress message + char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); + pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext + + if (pNewHead) { + int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; + int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); + assert(origLen == contLen); + + memcpy(pNewHead, pHead, sizeof(SRpcHead)); + pNewHead->msgLen = rpcMsgLenFromCont(origLen); + /// rpcFreeMsg(pHead); // free the compressed message buffer + pHead = pNewHead; + tTrace("decomp malloc mem:%p", temp); + } else { + tError("failed to allocate memory to decompress msg, contLen:%d", contLen); + } + } + + return pHead; +} +int32_t rpcInit(void) { + // impl later + return -1; +} + +void rpcCleanup(void) { + // impl later + return; +} +#endif