diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0cc9fb8619..c4a3cad28a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -58,7 +58,7 @@ extern int32_t tMsgDict[]; #define TMSG_INFO(TYPE) \ ((TYPE) < TDMT_DND_MAX_MSG || (TYPE) < TDMT_MND_MAX_MSG || (TYPE) < TDMT_VND_MAX_MSG || (TYPE) < TDMT_SCH_MAX_MSG || \ (TYPE) < TDMT_STREAM_MAX_MSG || (TYPE) < TDMT_MON_MAX_MSG || (TYPE) < TDMT_SYNC_MAX_MSG) || \ - (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG \ + (TYPE) < TDMT_VND_STREAM_MSG || (TYPE) < TDMT_VND_TMQ_MSG || (TYPE) < TDMT_VND_TMQ_MAX_MSG \ ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ : 0 diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 07046eb041..67fa1e7781 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -138,6 +138,12 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p->mgmtEp = epSet; taosThreadMutexInit(&p->qnodeMutex, NULL); p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2); + if (p->pTransporter == NULL) { + taosThreadMutexUnlock(&appInfo.mutex); + taosMemoryFreeClear(key); + taosMemoryFree(p); + return NULL; + } p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { destroyAppInst(p); @@ -1463,6 +1469,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { tscError("failed to sched msg to tsc, tsc ready to quit"); rpcFreeCont(pMsg->pCont); taosMemoryFree(arg->pEpset); + destroySendMsgInfo(pMsg->info.ahandle); taosMemoryFree(arg); } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 82c4ef8278..7e718e8d2c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1311,7 +1311,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < cli->numOfThreads; i++) { SCliThrd* pThrd = createThrdObj(shandle); - int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); + if (pThrd == NULL) { + return NULL; + } + + int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err == 0) { tDebug("success to create tranport-cli thread:%d", i); } @@ -1368,9 +1372,23 @@ static SCliThrd* createThrdObj(void* trans) { taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - + int err = uv_loop_init(pThrd->loop); + if (err != 0) { + tError("failed to init uv_loop, reason:%s", uv_err_name(err)); + taosMemoryFree(pThrd->loop); + taosThreadMutexDestroy(&pThrd->msgMtx); + taosMemoryFree(pThrd); + return NULL; + } pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); + if (pThrd->asyncPool == NULL) { + tError("failed to init async pool"); + uv_loop_close(pThrd->loop); + taosMemoryFree(pThrd->loop); + taosThreadMutexDestroy(&pThrd->msgMtx); + taosMemoryFree(pThrd); + return NULL; + } pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 41233a6526..4c107a88f1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -218,24 +218,37 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); - for (int i = 0; i < pool->nAsync; i++) { + int i = 0, err = 0; + for (i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); item->pThrd = arg; QUEUE_INIT(&item->qmsg); taosThreadMutexInit(&item->mtx, NULL); - uv_async_t* async = &(pool->asyncs[i]); - uv_async_init(loop, async, cb); async->data = item; + err = uv_async_init(loop, async, cb); + if (err != 0) { + tError("failed to init async, reason:%s", uv_err_name(err)); + break; + } } + + if (i != pool->nAsync) { + transAsyncPoolDestroy(pool); + pool = NULL; + } + return pool; } void transAsyncPoolDestroy(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; + if (item == NULL) continue; + taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); }