From affa8a7ef275b7304cfc7f7d321d0dc5b3971234 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 21 Jan 2022 22:51:59 +0800 Subject: [PATCH 1/2] refactor rpc --- source/libs/transport/src/transCli.c | 35 +++++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 00bc1b621f..bf395d62e5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,6 +17,8 @@ #include "transComm.h" +#define CONN_PERSIST_TIME(para) (para * 1000 * 100) + typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; @@ -102,6 +104,9 @@ static void clientProcessData(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { + uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + } free(pCtx->ip); free(pCtx); // impl @@ -112,6 +117,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; + tDebug("timeout, try to remove expire conn from connCache"); SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); while (p != NULL) { @@ -128,8 +134,8 @@ static void clientTimeoutCb(uv_timer_t* handle) { p = taosHashIterate((SHashObj*)pThrd->cache, p); } - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; - uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); + uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -158,8 +164,7 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { SConnList* plist = taosHashGet(pCache, key, strlen(key)); if (plist == NULL) { SConnList list; - plist = &list; - taosHashPut(pCache, key, strlen(key), plist, sizeof(*plist)); + taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list)); plist = taosHashGet(pCache, key, strlen(key)); QUEUE_INIT(&plist->conn); } @@ -177,7 +182,7 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; - conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before assert(plist != NULL); @@ -374,14 +379,13 @@ static void clientAsyncCb(uv_async_t* handle) { clientHandleReq(pMsg, pThrd); count++; if (count >= 2) { - tError("send batch size: %d", count); + tDebug("send batch size: %d", count); } } } static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -409,8 +413,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, pThrd->pTimer = malloc(sizeof(uv_timer_t)); uv_timer_init(pThrd->loop, pThrd->pTimer); pThrd->pTimer->data = pThrd; - pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); pThrd->cache = connCacheCreate(1); pThrd->pTransInst = shandle; @@ -426,16 +430,19 @@ static void clientMsgDestroy(SCliMsg* pMsg) { // impl later free(pMsg); } +static void destroyThrdObj(SCliThrdObj* pThrd) { + pthread_join(pThrd->thread, NULL); + pthread_mutex_destroy(&pThrd->msgMtx); + free(pThrd->cliAsync); + free(pThrd->loop); + free(pThrd); +} +// void taosCloseClient(void* arg) { // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* pThrd = cli->pThreadObj[i]; - pthread_join(pThrd->thread, NULL); - pthread_mutex_destroy(&pThrd->msgMtx); - free(pThrd->cliAsync); - free(pThrd->loop); - free(pThrd); + destroyThrdObj(cli->pThreadObj[i]); } free(cli->pThreadObj); free(cli); From 046794047b9fea16ab465ca6eb88d0aa1413fcc9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 22 Jan 2022 12:09:34 +0800 Subject: [PATCH 2/2] refactor uv code --- source/libs/transport/src/transCli.c | 102 +++++++++++++++------------ source/libs/transport/src/transSrv.c | 82 ++++++++++++++------- 2 files changed, 115 insertions(+), 69 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bf395d62e5..ffd8d35bfc 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -44,7 +44,7 @@ typedef struct SCliThrdObj { uv_loop_t* loop; uv_async_t* cliAsync; // uv_timer_t* pTimer; - void* cache; // conn pool + void* pool; // conn pool queue msg; pthread_mutex_t msgMtx; uint64_t nextTimeout; // next timeout @@ -65,10 +65,10 @@ typedef struct SConnList { // conn pool // add expire timeout and capacity limit -static void* connCacheCreate(int size); -static void* connCacheDestroy(void* cache); -static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); -static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +static void* connPoolCreate(int size); +static void* connPoolDestroy(void* pool); +static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); +static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); // register timer in each thread to clear expire conn static void clientTimeoutCb(uv_timer_t* handle); @@ -90,8 +90,14 @@ static void clientConnDestroy(SCliConn* pConn); static void clientMsgDestroy(SCliMsg* pMsg); +// thread obj +static SCliThrdObj* createThrdObj(); +static void destroyThrdObj(SCliThrdObj* pThrd); +// thread static void* clientThread(void* arg); +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); + static void clientProcessData(SCliConn* conn) { STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; SRpcInfo* pRpc = pCtx->pRpc; @@ -103,7 +109,7 @@ static void clientProcessData(SCliConn* conn) { (pRpc->cfp)(NULL, &rpcMsg, NULL); SCliThrdObj* pThrd = conn->hostThrd; - addConnToCache(pThrd->cache, pCtx->ip, pCtx->port, conn); + addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } @@ -111,15 +117,14 @@ static void clientProcessData(SCliConn* conn) { free(pCtx); // impl } -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tDebug("timeout, try to remove expire conn from connCache"); + tDebug("timeout, try to remove expire conn from conn pool"); - SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); + SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { while (!QUEUE_IS_EMPTY(&p->conn)) { queue* h = QUEUE_HEAD(&p->conn); @@ -131,18 +136,18 @@ static void clientTimeoutCb(uv_timer_t* handle) { break; } } - p = taosHashIterate((SHashObj*)pThrd->cache, p); + p = taosHashIterate((SHashObj*)pThrd->pool, p); } pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -static void* connCacheCreate(int size) { - SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - return cache; +static void* connPoolCreate(int size) { + SHashObj* pool = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + return pool; } -static void* connCacheDestroy(void* cache) { - SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); +static void* connPoolDestroy(void* pool) { + SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); @@ -150,22 +155,22 @@ static void* connCacheDestroy(void* cache) { SCliConn* c = QUEUE_DATA(h, SCliConn, conn); clientConnDestroy(c); } - connList = taosHashIterate((SHashObj*)cache, connList); + connList = taosHashIterate((SHashObj*)pool, connList); } - taosHashClear(cache); + taosHashClear(pool); } -static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { +static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { char key[128] = {0}; tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - SHashObj* pCache = cache; - SConnList* plist = taosHashGet(pCache, key, strlen(key)); + SHashObj* pPool = pool; + SConnList* plist = taosHashGet(pPool, key, strlen(key)); if (plist == NULL) { SConnList list; - taosHashPut(pCache, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pCache, key, strlen(key)); + taosHashPut(pPool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pPool, key, strlen(key)); QUEUE_INIT(&plist->conn); } @@ -176,14 +181,14 @@ static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { QUEUE_REMOVE(h); return QUEUE_DATA(h, SCliConn, conn); } -static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { +static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { char key[128] = {0}; tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -327,7 +332,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; - SCliConn* conn = getConnFromCache(pThrd->cache, pCtx->ip, pCtx->port); + SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later conn->data = pMsg; @@ -378,9 +383,9 @@ static void clientAsyncCb(uv_async_t* handle) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); clientHandleReq(pMsg, pThrd); count++; - if (count >= 2) { - tDebug("send batch size: %d", count); - } + } + if (count >= 2) { + tDebug("already process batch size: %d", count); } } @@ -398,24 +403,8 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); - - QUEUE_INIT(&pThrd->msg); - pthread_mutex_init(&pThrd->msgMtx, NULL); - - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - - pThrd->cliAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); - pThrd->cliAsync->data = pThrd; - - pThrd->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pThrd->pTimer); - pThrd->pTimer->data = pThrd; - + SCliThrdObj* pThrd = createThrdObj(); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); - pThrd->cache = connCacheCreate(1); pThrd->pTransInst = shandle; int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); @@ -430,7 +419,30 @@ static void clientMsgDestroy(SCliMsg* pMsg) { // impl later free(pMsg); } +static SCliThrdObj* createThrdObj() { + SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + pThrd->cliAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); + pThrd->cliAsync->data = pThrd; + + pThrd->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->pTimer->data = pThrd; + + pThrd->pool = connPoolCreate(1); + return pThrd; +} static void destroyThrdObj(SCliThrdObj* pThrd) { + if (pThrd == NULL) { + return; + } pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); free(pThrd->cliAsync); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 77b5f635f4..dafb809e2a 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -91,10 +91,14 @@ static SConn* connCreate(); static void connDestroy(SConn* conn); static void uvConnDestroy(uv_handle_t* handle); -// server worke thread +// server and worker thread static void* workerThread(void* arg); static void* acceptThread(void* arg); +// add handle loop +static bool addHandleToWorkloop(void* arg); +static bool addHandleToAcceptloop(void* arg); + void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { /* * formate of data buffer: @@ -268,7 +272,7 @@ static void uvProcessData(SConn* pConn) { rpcMsg.handle = pConn; (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); + // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type } @@ -451,22 +455,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void* acceptThread(void* arg) { // opt SServerObj* srv = (SServerObj*)arg; - uv_tcp_init(srv->loop, &srv->server); - - struct sockaddr_in bind_addr; - - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); - return NULL; - } uv_run(srv->loop, UV_RUN_DEFAULT); } -static void initWorkThrdObj(SWorkThrdObj* pThrd) { +static bool addHandleToWorkloop(void* arg) { + SWorkThrdObj* pThrd = arg; pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); + if (0 != uv_loop_init(pThrd->loop)) { + return false; + } // SRpcInfo* pRpc = pThrd->shandle; uv_pipe_init(pThrd->loop, pThrd->pipe, 1); @@ -482,6 +478,31 @@ static void initWorkThrdObj(SWorkThrdObj* pThrd) { pThrd->workerAsync->data = pThrd; uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + return true; +} + +static bool addHandleToAcceptloop(void* arg) { + // impl later + SServerObj* srv = arg; + + int err = 0; + if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) { + tError("failed to init accept server: %s", uv_err_name(err)); + return false; + } + + struct sockaddr_in bind_addr; + + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { + tError("failed to bind: %s", uv_err_name(err)); + return false; + } + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { + tError("failed to listen: %s", uv_err_name(err)); + return false; + } + return true; } void* workerThread(void* arg) { SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; @@ -546,11 +567,12 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); int fds[2]; if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - return NULL; + goto End; } uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write @@ -559,7 +581,9 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, thrd->fd = fds[0]; thrd->pipe = &(srv->pipe[i][1]); // init read - initWorkThrdObj(thrd); + if (false == addHandleToWorkloop(thrd)) { + goto End; + } int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); if (err == 0) { tDebug("sucess to create worker-thread %d", i); @@ -568,9 +592,10 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, // TODO: clear all other resource later tError("failed to create worker-thread %d", i); } - srv->pThreadObj[i] = thrd; } - + if (false == addHandleToAcceptloop(srv)) { + goto End; + } int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); if (err == 0) { tDebug("success to create accept-thread"); @@ -579,16 +604,25 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return srv; +End: + taosCloseServer(srv); + return NULL; +} + +void destroyWorkThrd(SWorkThrdObj* pThrd) { + if (pThrd == NULL) { + return; + } + pthread_join(pThrd->thread, NULL); + // free(srv->pipe[i]); + free(pThrd->loop); + free(pThrd); } void taosCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { - SWorkThrdObj* pThrd = srv->pThreadObj[i]; - pthread_join(pThrd->thread, NULL); - free(srv->pipe[i]); - free(pThrd->loop); - free(pThrd); + destroyWorkThrd(srv->pThreadObj[i]); } free(srv->loop); free(srv->pipe);