From cdf362b58861a6c36c3c14c8bdacb36add09986c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Feb 2023 12:08:16 +0800 Subject: [PATCH 1/4] fix: fix fd limit crash --- source/client/src/clientImpl.c | 1 + source/libs/transport/src/transCli.c | 23 ++++++++++++++++++++--- source/libs/transport/src/transComm.c | 21 +++++++++++++++++---- 3 files changed, 38 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b5b99e92b0..09f614ddb2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1460,6 +1460,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { tscError("failed to sched msg to tsc, tsc ready to quit"); rpcFreeCont(pMsg->pCont); taosMemoryFree(arg->pEpset); + destroySendMsgInfo(pMsg->info.ahandle); taosMemoryFree(arg); } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1a99db5f99..954303e3c1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1275,7 +1275,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < cli->numOfThreads; i++) { SCliThrd* pThrd = createThrdObj(shandle); - int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); + if (pThrd == NULL) { + return NULL; + } + + int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err == 0) { tDebug("success to create tranport-cli thread:%d", i); } @@ -1332,9 +1336,22 @@ static SCliThrd* createThrdObj(void* trans) { taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - + int err = uv_loop_init(pThrd->loop); + if (err != 0) { + tError("failed to init uv_loop, reason:%s", uv_err_name(err)); + taosMemoryFree(pThrd->loop); + taosThreadMutexDestroy(&pThrd->msgMtx); + taosMemoryFree(pThrd); + return NULL; + } pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); + if (pThrd->asyncPool == NULL) { + uv_loop_close(pThrd->loop); + taosMemoryFree(pThrd->loop); + taosThreadMutexDestroy(&pThrd->msgMtx); + taosMemoryFree(pThrd); + return NULL; + } pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1161ed7c00..8c9e8f5a60 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -214,24 +214,37 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); - for (int i = 0; i < pool->nAsync; i++) { + int i = 0, err = 0; + for (i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); item->pThrd = arg; QUEUE_INIT(&item->qmsg); taosThreadMutexInit(&item->mtx, NULL); - uv_async_t* async = &(pool->asyncs[i]); - uv_async_init(loop, async, cb); async->data = item; + err = uv_async_init(loop, async, cb); + if (err != 0) { + tError("failed to init async, reason:%s", uv_err_name(err)); + break; + } } + + if (i != pool->nAsync) { + transAsyncPoolDestroy(pool); + pool = NULL; + } + return pool; } void transAsyncPoolDestroy(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; + if (item == NULL) continue; + taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); } From 9f62cfd6f330599b6424c82e495557b195712395 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Feb 2023 12:12:58 +0800 Subject: [PATCH 2/4] ref log --- source/libs/transport/src/transCli.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 954303e3c1..9626c92216 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1346,6 +1346,7 @@ static SCliThrd* createThrdObj(void* trans) { } pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); if (pThrd->asyncPool == NULL) { + tError("failed to init async pool"); uv_loop_close(pThrd->loop); taosMemoryFree(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); From e14994c23f9db8fcf0498ded13244d283492d71d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Feb 2023 12:45:20 +0800 Subject: [PATCH 3/4] fix: fail fast --- source/client/src/clientImpl.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 09f614ddb2..3c26e674b4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -138,6 +138,12 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p->mgmtEp = epSet; taosThreadMutexInit(&p->qnodeMutex, NULL); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2); + if (p->pTransporter == NULL) { + taosThreadMutexUnlock(&appInfo.mutex); + taosMemoryFreeClear(key); + taosMemoryFree(p); + return NULL; + } p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { destroyAppInst(p); From 383bcea0cbf51e1270f8b26b063539e030e5089a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 3 Feb 2023 14:17:36 +0800 Subject: [PATCH 4/4] fix err log --- include/common/tmsg.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0cc9fb8619..c4a3cad28a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -58,7 +58,7 @@ extern int32_t tMsgDict[]; #define TMSG_INFO(TYPE) \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \ - (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \ + (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG || (TYPE) < TDMT_VND_TMQ_MAX_MSG \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ : 0