diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 6096f69ca2..d19877dbf1 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -507,6 +507,7 @@ static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t p } } static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ) { + int32_t code = 0; char buf[256] = {0}; sprintf(buf, "%s:%d", server, port); @@ -514,7 +515,9 @@ static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, (void)taosHashRemove(pTable, buf, strlen(buf)); } else { int32_t st = taosGetTimestampSec(); - (void)taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st)); + if ((code = taosHashPut(pTable, buf, strlen(buf), &st, sizeof(st))) != 0) { + tError("http-report failed to update conn status, dst:%s, reason:%s", buf, tstrerror(code)); + } } return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 073d6c0f17..dfa9595eb0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -332,6 +332,21 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { transQueueClear(&conn->cliMsgs); memset(&conn->ctx, 0, sizeof(conn->ctx)); } +void cliResetTimer(SCliThrd* pThrd, SCliConn* conn) { + if (conn->timer) { + if (uv_is_active((uv_handle_t*)conn->timer)) { + tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); + (void)uv_timer_stop(conn->timer); + } + if (taosArrayPush(pThrd->timerList, &conn->timer) == NULL) { + tError("failed to push timer %p to list, reason:%s", conn->timer, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + conn->timer = NULL; + return; + } + conn->timer->data = NULL; + conn->timer = NULL; + } +} bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; @@ -376,15 +391,7 @@ void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - if (conn->timer) { - if (uv_is_active((uv_handle_t*)conn->timer)) { - tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); - (void)uv_timer_stop(conn->timer); - } - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } + cliResetTimer(pThrd, conn); STransMsgHead* pHead = NULL; @@ -593,8 +600,8 @@ void cliConnTimeout(uv_timer_t* handle) { (void)uv_timer_stop(handle); handle->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; + + cliResetTimer(pThrd, conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, UV_ECANCELED); @@ -643,13 +650,16 @@ void* destroyConnPool(SCliThrd* pThrd) { } static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { + int32_t code = 0; void* pool = pThrd->pool; STrans* pTranInst = pThrd->pTransInst; size_t klen = strlen(key); SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; - (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + if ((code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list))) != 0) { + return NULL; + } plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); @@ -686,13 +696,17 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { } static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { + int32_t code = 0; void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; size_t klen = strlen(key); SConnList* plist = taosHashGet((SHashObj*)pool, key, klen); if (plist == NULL) { SConnList list = {0}; - (void)taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list)); + if ((code = taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list))) != 0) { + tError("failed to put key %s to pool, reason:%s", key, tstrerror(code)); + return NULL; + } plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); @@ -805,12 +819,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { } SCliThrd* thrd = conn->hostThrd; - if (conn->timer != NULL) { - (void)uv_timer_stop(conn->timer); - (void)taosArrayPush(thrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } + cliResetTimer(thrd, conn); + if (T_REF_VAL_GET(conn) > 1) { transUnrefCliHandle(conn); } @@ -1053,12 +1063,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transDQCancel(pThrd->timeoutQueue, conn->task); conn->task = NULL; } - if (conn->timer != NULL) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - } + cliResetTimer(pThrd, conn); if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { @@ -1073,12 +1078,7 @@ static void cliDestroy(uv_handle_t* handle) { } SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; - if (conn->timer != NULL) { - (void)uv_timer_stop(conn->timer); - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } + cliResetTimer(pThrd, conn); (void)atomic_sub_fetch_32(&pThrd->connCount, 1); @@ -1385,10 +1385,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { uint32_t ipaddr = 0; if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; + cliResetTimer(pThrd, conn); cliHandleFastFail(conn, code); return; } @@ -1421,10 +1418,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; + cliResetTimer(pThrd, conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, -1); @@ -1502,7 +1496,10 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - (void)taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); + int32_t code = taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem)); + if (code != 0) { + tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code)); + } } } } else { @@ -1522,10 +1519,7 @@ void cliConnCb(uv_connect_t* req, int status) { if (pConn->timer == NULL) { timeout = true; } else { - (void)uv_timer_stop(pConn->timer); - pConn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &pConn->timer); - pConn->timer = NULL; + cliResetTimer(pThrd, pConn); } STUB_RAND_NETWORK_ERR(status); @@ -1870,11 +1864,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); if (code != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetTimer(pThrd, conn); cliHandleExcept(conn, code); return; } @@ -1910,10 +1900,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - (void)uv_timer_stop(conn->timer); - conn->timer->data = NULL; - (void)taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; + cliResetTimer(pThrd, conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, ret); @@ -2377,7 +2364,9 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } (void)uv_timer_init(pThrd->loop, timer); - (void)taosArrayPush(pThrd->timerList, &timer); + if (taosArrayPush(pThrd->timerList, &timer) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } } pThrd->pool = createConnPool(4); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index a0836eae3d..1758ca65cc 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -375,11 +375,10 @@ void transCtxMerge(STransCtx* dst, STransCtx* src) { STransCtxVal* sVal = (STransCtxVal*)iter; key = taosHashGetKey(sVal, &klen); - // STransCtxVal* dVal = taosHashGet(dst->args, key, klen); - // if (dVal) { - // dst->freeFunc(dVal->val); - // } - (void)taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); + int32_t code = taosHashPut(dst->args, key, klen, sVal, sizeof(*sVal)); + if (code != 0) { + tError("failed to put val to hash, reason:%s", tstrerror(code)); + } iter = taosHashIterate(src->args, iter); } taosHashCleanup(src->args); @@ -453,7 +452,9 @@ bool transQueuePush(STransQueue* queue, void* arg) { if (queue->q == NULL) { return true; } - (void)taosArrayPush(queue->q, &arg); + if (taosArrayPush(queue->q, &arg) == NULL) { + return false; + } if (taosArrayGetSize(queue->q) > 1) { return false; }