diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2b2fcb557f..f2d844f73d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,6 +26,7 @@ typedef struct SCliConn { queue conn; char spi; char secured; + uint64_t expireTime; } SCliConn; typedef struct SCliMsg { @@ -39,10 +40,13 @@ typedef struct SCliThrdObj { pthread_t thread; uv_loop_t* loop; uv_async_t* cliAsync; // - void* cache; // conn pool + uv_timer_t* pTimer; + void* cache; // conn pool queue msg; pthread_mutex_t msgMtx; - void* shandle; + uint64_t nextTimeout; // next timeout + void* shandle; // + } SCliThrdObj; typedef struct SClientObj { @@ -63,6 +67,8 @@ static void* connCacheDestroy(void* cache); static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); +// register timer in each thread to clear expire conn +static void clientTimeoutCb(uv_timer_t* handle); // process data read from server, auth/decompress etc static void clientProcessData(SCliConn* conn); // check whether already read complete packet from server @@ -84,21 +90,55 @@ static void clientMsgDestroy(SCliMsg* pMsg); static void* clientThread(void* arg); static void clientProcessData(SCliConn* conn) { + STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; + SRpcInfo* pRpc = pCtx->ahandle; + SRpcMsg rpcMsg; + + rpcMsg.pCont = conn->readBuf.buf; + rpcMsg.contLen = conn->readBuf.len; + rpcMsg.ahandle = pCtx->ahandle; + (pRpc->cfp)(NULL, &rpcMsg, NULL); // impl } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); +static void clientTimeoutCb(uv_timer_t* handle) { + SCliThrdObj* pThrd = handle->data; + SRpcInfo* pRpc = pThrd->shandle; + int64_t currentTime = pThrd->nextTimeout; + + SConnList* p = taosHashIterate((SHashObj*)pThrd->cache, NULL); + while (p != NULL) { + while (!QUEUE_IS_EMPTY(&p->conn)) { + queue* h = QUEUE_HEAD(&p->conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + if (c->expireTime < currentTime) { + QUEUE_REMOVE(h); + clientConnDestroy(c); + } else { + break; + } + } + p = taosHashIterate((SHashObj*)pThrd->cache, p); + } + + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + uv_timer_start(handle, clientTimeoutCb, pRpc->idleTime * 10, 0); +} static void* connCacheCreate(int size) { SHashObj* cache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); return false; } static void* connCacheDestroy(void* cache) { SConnList* connList = taosHashIterate((SHashObj*)cache, NULL); - while (!QUEUE_IS_EMPTY(&connList->conn)) { - queue* h = QUEUE_HEAD(&connList->conn); - QUEUE_REMOVE(h); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); - clientConnDestroy(c); + while (connList != NULL) { + while (!QUEUE_IS_EMPTY(&connList->conn)) { + queue* h = QUEUE_HEAD(&connList->conn); + QUEUE_REMOVE(h); + SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + clientConnDestroy(c); + } + connList = taosHashIterate((SHashObj*)cache, connList); } taosHashClear(cache); } @@ -129,8 +169,10 @@ static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - SHashObj* pCache = cache; - SConnList* plist = taosHashGet(pCache, key, strlen(key)); + STransConnCtx* ctx = ((SCliMsg*)conn->data)->ctx; + SRpcInfo* pRpc = ctx->pRpc; + conn->expireTime = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + SConnList* plist = taosHashGet((SHashObj*)cache, key, strlen(key)); // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -206,6 +248,7 @@ static void clientConnDestroy(SCliConn* conn) { } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; + QUEUE_REMOVE(&conn->conn); clientConnDestroy(conn); } @@ -279,6 +322,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->writeReq = malloc(sizeof(uv_write_t)); + QUEUE_INIT(&conn->conn); conn->connReq.data = conn; conn->data = pMsg; @@ -315,6 +359,9 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; + SRpcInfo* pRpc = pThrd->shandle; + pThrd->nextTimeout = taosGetTimestampMs() + pRpc->idleTime * 1000 * 10; + uv_timer_start(pThrd->pTimer, clientTimeoutCb, pRpc->idleTime * 10, 0); uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -336,7 +383,11 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); pThrd->cliAsync->data = pThrd; + pThrd->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pThrd->pTimer); + pThrd->shandle = shandle; + int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { tDebug("sucess to create tranport-client thread %d", i);