diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5f1c19a95b..2d537132c4 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -114,7 +114,7 @@ typedef struct SCliThrd { SDelayQueue* timeoutQueue; SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout - void* pTransInst; // + STrans* pTransInst; // int connCount; void (*destroyAhandleFp)(void* ahandle); @@ -2052,12 +2052,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { } static void* cliWorkThread(void* arg) { + char threadName[TSDB_LABEL_LEN] = {0}; + SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - char threadName[TSDB_LABEL_LEN] = {0}; - STrans* pInst = pThrd->pTransInst; - strtolower(threadName, pInst->label); + strtolower(threadName, pThrd->pTransInst->label); setThreadName(threadName); uv_run(pThrd->loop, UV_RUN_DEFAULT); @@ -2121,11 +2121,9 @@ static FORCE_INLINE void destroyCmsg(void* arg) { taosMemoryFree(pMsg); } static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { - SCliMsg* pMsg = arg; - if (pMsg == NULL) { - return; - } + if (arg == NULL) return; + SCliMsg* pMsg = arg; SCliThrd* pThrd = param; if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); @@ -2168,8 +2166,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { code = uv_loop_init(pThrd->loop); if (code != 0) { tError("failed to init uv_loop, reason:%s", uv_err_name(code)); - code = TSDB_CODE_THIRDPARTY_ERROR; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); } int32_t nSync = pTransInst->supportBatch ? 4 : 8; @@ -2188,8 +2185,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { code = uv_prepare_init(pThrd->loop, pThrd->prepare); if (code != 0) { tError("failed to create prepre since:%s", uv_err_name(code)); - code = TSDB_CODE_THIRDPARTY_ERROR; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); } pThrd->prepare->data = pThrd; @@ -2197,14 +2193,13 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); if (pThrd->timerList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); if (timer == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } uv_timer_init(pThrd->loop, timer); taosArrayPush(pThrd->timerList, &timer); @@ -2213,7 +2208,7 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->pool = createConnPool(4); if (pThrd->pool == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } if ((code = transDQCreate(pThrd->loop, &pThrd->delayQueue)) != 0) { TAOS_CHECK_GOTO(code, NULL, _end); @@ -2230,19 +2225,16 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->fqdn2ipCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->failFastCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); if (pThrd->batchCache == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); @@ -2324,12 +2316,23 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) { taosMemoryFree(ctx); } -void cliSendQuit(SCliThrd* thrd) { +int32_t cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully + int32_t code = 0; SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (msg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + msg->type = Quit; - transAsyncSend(thrd->asyncPool, &msg->q); + if ((code = transAsyncSend(thrd->asyncPool, &msg->q)) != 0) { + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + taosMemoryFree(msg); + return code; + } + atomic_store_8(&thrd->asyncPool->stop, 1); + return 0; } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { @@ -2653,9 +2656,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } void transCloseClient(void* arg) { + int32_t code = 0; SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - cliSendQuit(cli->pThreadObj[i]); + code = cliSendQuit(cli->pThreadObj[i]); + if (code != 0) { + tError("failed to send quit to thread:%d, reason:%s", i, tstrerror(code)); + } + destroyThrdObj(cli->pThreadObj[i]); } taosMemoryFree(cli->pThreadObj);