diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d5739cde58..8f83b1bde2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -713,10 +713,22 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } arg->param1 = *pMsg; arg->param2 = pThrd; - (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + if (task == NULL) { + taosMemoryFree(arg); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } + (*pMsg)->ctx->task = task; tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; @@ -724,9 +736,23 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { // send msg in delay queue if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } arg->param1 = *pMsg; arg->param2 = pThrd; - (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + if (task == NULL) { + taosMemoryFree(arg); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } + + (*pMsg)->ctx->task = task; tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); @@ -766,7 +792,11 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } - allocConnRef(conn, true); + int32_t code = allocConnRef(conn, true); + if (code != 0) { + cliDestroyConn(conn, true); + return; + } SCliThrd* thrd = conn->hostThrd; if (conn->timer != NULL) { @@ -811,6 +841,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + if (arg == NULL) return; arg->param1 = conn; arg->param2 = thrd; @@ -826,9 +857,12 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { } SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + exh->refId = transAddExHandle(transGetRefMgt(), exh); - SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); - if (self != exh) { + if (exh->refId < 0) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; } @@ -838,8 +872,14 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->handle = conn; exh->pThrd = conn->hostThrd; + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (self != exh) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + conn->refId = exh->refId; - if (conn->refId == -1) { + if (conn->refId < 0) { taosMemoryFree(exh); } return 0; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 8a64b7d808..aa6b377423 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -637,6 +637,10 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) { SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); + if (task == NULL) { + return NULL; + } + task->func = func; task->arg = arg; task->execTime = now + timeoutMs;