From 6be6027d1702eefe71de10dbbe8c08ee50d02273 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 8 Aug 2024 16:15:44 +0800 Subject: [PATCH 1/9] add mem check --- source/libs/transport/src/transCli.c | 94 ++++++++++++++++++++++----- source/libs/transport/src/transComm.c | 9 ++- source/libs/transport/src/transSvr.c | 4 +- 3 files changed, 87 insertions(+), 20 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4b324e52c6..aab23ec8a0 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -190,6 +190,7 @@ static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); +static void cliDestroyBatch(SCliBatch* pBatch); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); @@ -662,7 +663,7 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { if (QUEUE_IS_EMPTY(&plist->conns)) { if (plist->list->numOfConn >= pTranInst->connLimitNum) { *exceed = true; - return NULL;; + return NULL; } plist->list->numOfConn++; return NULL; @@ -708,7 +709,8 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; - if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) { + if (pTransInst->notWaitAvaliableConn || + (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) { tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); @@ -903,11 +905,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { exh->handle = conn; exh->pThrd = conn->hostThrd; taosWUnLockLatch(&exh->latch); - + conn->refId = exh->refId; taosWUnLockLatch(&exh->latch); - tDebug("conn %p specified by %"PRId64"", conn, handle); + tDebug("conn %p specified by %" PRId64 "", conn, handle); (void)transReleaseExHandle(transGetRefMgt(), handle); return 0; @@ -919,7 +921,6 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_ int32_t code = transAllocBuffer(pBuf, buf); if (code < 0) { tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); - // cliDestroyConn(conn, true); } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -1081,9 +1082,8 @@ static void cliDestroy(uv_handle_t* handle) { (void)atomic_sub_fetch_32(&pThrd->connCount, 1); if (conn->refId > 0) { - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); - + (void)transReleaseExHandle(transGetRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetRefMgt(), conn->refId); } taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); @@ -1149,6 +1149,7 @@ static void cliSendCb(uv_write_t* req, int status) { (void)uv_read_start((uv_stream_t*)pConn->stream, cliAllocRecvBufferCb, cliRecvCb); } void cliSendBatch(SCliConn* pConn) { + int32_t code = 0; SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -1158,8 +1159,13 @@ void cliSendBatch(SCliConn* pConn) { pBatch->pList->connCnt += 1; uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); - int i = 0; + if (wb == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); + goto _except; + } + int i = 0; queue* h = NULL; QUEUE_FOREACH(h, &pBatch->wq) { SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); @@ -1169,6 +1175,11 @@ void cliSendBatch(SCliConn* pConn) { STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); + if (pMsg->pCont == NULL) { + code = TSDB_CODE_OUT_OF_BUFFER; + tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); + goto _except; + } pMsg->contLen = 0; } @@ -1202,11 +1213,27 @@ void cliSendBatch(SCliConn* pConn) { } uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); + if (req == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); + taosMemoryFree(wb); + return; + } req->data = pConn; tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, pBatch->wLen, pBatch->batchSize); - (void)uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); + + code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); + if (code != 0) { + tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code)); + } taosMemoryFree(wb); + return; + +_except: + cliDestroyBatch(pBatch); + pConn->pBatch = NULL; + return; } void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; @@ -1626,7 +1653,7 @@ static void cliHandleFreeById(SCliMsg* pMsg, SCliThrd* pThrd) { } return; _exception: - tDebug("already free conn %p by id %" PRId64"", conn, refId); + tDebug("already free conn %p by id %" PRId64 "", conn, refId); (void)transReleaseExHandle(transGetRefMgt(), refId); destroyCmsg(pMsg); @@ -1920,6 +1947,7 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { STrans* pInst = pThrd->pTransInst; + int32_t code = 0; int count = 0; while (!QUEUE_IS_EMPTY(wq)) { @@ -1944,6 +1972,11 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); + if (pBatchList == NULL) { + tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + destroyCmsg(pMsg); + continue; + } QUEUE_INIT(&pBatchList->wq); pBatchList->connMax = pInst->connLimitNum; pBatchList->connCnt = 0; @@ -1952,23 +1985,45 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { pBatchList->ip = taosStrdup(ip); pBatchList->dst = taosStrdup(key); + if (pBatchList->ip == NULL || pBatchList->dst == NULL) { + taosMemoryFree(pBatchList->ip); + taosMemoryFree(pBatchList->dst); + taosMemoryFree(pBatchList); + destroyCmsg(pMsg); + tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + continue; + } pBatchList->port = port; SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); + if (pBatch == NULL) { + tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + destroyCmsg(pMsg); + continue; + } - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; pBatch->batchSize += pMsg->msg.contLen; pBatch->pList = pBatchList; QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - (void)taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); + code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); + if (code != 0) { + taosMemoryFree(pBatch); + taosMemoryFree(pBatchList->ip); + taosMemoryFree(pBatchList->dst); + taosMemoryFree(pBatchList); + tError("failed to put batch list to cache, reason:%s", tstrerror(code)); + destroyCmsg(pMsg); + continue; + } } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + if (pBatch == NULL) { + destroyCmsg(pMsg); + continue; + } QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->listq); @@ -1991,6 +2046,11 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { pBatch->wLen += 1; } else { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + if (pBatch == NULL) { + destroyCmsg(pMsg); + continue; + } + QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->listq); @@ -3269,7 +3329,7 @@ int32_t transFreeConnById(void* shandle, int64_t transpointId) { return TSDB_CODE_RPC_MODULE_QUIT; } if (transpointId == 0) { - tDebug("not free by refId:%"PRId64"", transpointId); + tDebug("not free by refId:%" PRId64 "", transpointId); TAOS_CHECK_GOTO(0, NULL, _exception); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 148f4d4e9a..69faf5a266 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -32,7 +32,7 @@ int32_t transCompressMsg(char* msg, int32_t len) { char* buf = taosMemoryMalloc(len + compHdr + 8); // 8 extra bytes if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d", len); + tWarn("failed to allocate memory for rpc msg compression, contLen:%d", len); ret = len; return ret; } @@ -206,6 +206,8 @@ int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { p->cap = p->left + p->len; p->buf = taosMemoryRealloc(p->buf, p->cap); if (p->buf == NULL) { + uvBuf->base = NULL; + uvBuf->len = 0; return TSDB_CODE_OUT_OF_MEMORY; } uvBuf->base = p->buf + p->len; @@ -439,7 +441,10 @@ void transReqQueueClear(queue* q) { } int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { - queue->q = taosArrayInit(2, sizeof(void*)); + queue->q = taosArrayInit(4, sizeof(void*)); + if (taosArrayReserve(queue->q, 4) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } queue->freeFunc = (void (*)(const void*))freeFunc; if (queue->q == NULL) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 0202fbc599..f3e4c93f1b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -205,7 +205,6 @@ void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b int32_t code = transAllocBuffer(pBuf, buf); if (code < 0) { tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); - // destroyConn(conn, true); } } @@ -542,6 +541,9 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->len = 2; buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len); + if (buf == NULL) { + tError("failed to alloc conn read buffer since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + } } void uvOnTimeoutCb(uv_timer_t* handle) { From 2af86c961a42d7d24e24e84013e03d68cc8c5047 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 8 Aug 2024 19:35:20 +0800 Subject: [PATCH 2/9] add mem check --- source/libs/transport/src/transCli.c | 3 +-- source/libs/transport/src/transComm.c | 7 ++----- source/libs/transport/src/transSvr.c | 2 +- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index aab23ec8a0..8aea8b7aaa 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -907,7 +907,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { taosWUnLockLatch(&exh->latch); conn->refId = exh->refId; - taosWUnLockLatch(&exh->latch); tDebug("conn %p specified by %" PRId64 "", conn, handle); @@ -1978,6 +1977,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { continue; } QUEUE_INIT(&pBatchList->wq); + pBatchList->port = port; pBatchList->connMax = pInst->connLimitNum; pBatchList->connCnt = 0; pBatchList->batchLenLimit = pInst->batchSize; @@ -1993,7 +1993,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); continue; } - pBatchList->port = port; SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); if (pBatch == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 69faf5a266..b940c494d8 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -441,15 +441,12 @@ void transReqQueueClear(queue* q) { } int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { - queue->q = taosArrayInit(4, sizeof(void*)); - if (taosArrayReserve(queue->q, 4) == NULL) { + queue->q = taosArrayInit(2, sizeof(void*)); + if (queue->q == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } queue->freeFunc = (void (*)(const void*))freeFunc; - if (queue->q == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } return 0; } bool transQueuePush(STransQueue* queue, void* arg) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f3e4c93f1b..c1b934c812 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -541,7 +541,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { buf->len = 2; buf->base = taosMemoryCalloc(1, sizeof(char) * buf->len); - if (buf == NULL) { + if (buf->base == NULL) { tError("failed to alloc conn read buffer since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); } } From 3eaae2ab7582b9911145389e481d6bd0f956515d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Aug 2024 10:40:38 +0800 Subject: [PATCH 3/9] add mem check --- source/libs/transport/src/transCli.c | 56 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8aea8b7aaa..d8b5cff94b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1164,7 +1164,7 @@ void cliSendBatch(SCliConn* pConn) { goto _except; } - int i = 0; + int i = 0; queue* h = NULL; QUEUE_FOREACH(h, &pBatch->wq) { SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); @@ -1215,8 +1215,7 @@ void cliSendBatch(SCliConn* pConn) { if (req == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - taosMemoryFree(wb); - return; + goto _except; } req->data = pConn; tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, @@ -1225,12 +1224,15 @@ void cliSendBatch(SCliConn* pConn) { code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); if (code != 0) { tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code)); + goto _except; } + taosMemoryFree(wb); return; _except: cliDestroyBatch(pBatch); + taosMemoryFree(wb); pConn->pBatch = NULL; return; } @@ -2033,33 +2035,31 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); (*ppBatchList)->len += 1; - - continue; - } - - queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); - SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { - QUEUE_PUSH(&pBatch->wq, h); - pBatch->batchSize += pMsg->msg.contLen; - pBatch->wLen += 1; } else { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - destroyCmsg(pMsg); - continue; + queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); + SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); + if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { + QUEUE_PUSH(&pBatch->wq, h); + pBatch->batchSize += pMsg->msg.contLen; + pBatch->wLen += 1; + } else { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + if (pBatch == NULL) { + destroyCmsg(pMsg); + continue; + } + + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; } - - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; } } continue; From 99fb80cd27fb6fe96e61365a369f451894b0607f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Aug 2024 18:07:43 +0800 Subject: [PATCH 4/9] refactor errno code --- source/libs/transport/src/transCli.c | 66 ++++++++++++++-------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d8b5cff94b..eead34b98a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1973,11 +1973,11 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - if (pBatchList == NULL) { - tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - destroyCmsg(pMsg); - continue; - } + // if (pBatchList == NULL) { + // tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + // destroyCmsg(pMsg); + // continue; + // } QUEUE_INIT(&pBatchList->wq); pBatchList->port = port; pBatchList->connMax = pInst->connLimitNum; @@ -1987,21 +1987,21 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { pBatchList->ip = taosStrdup(ip); pBatchList->dst = taosStrdup(key); - if (pBatchList->ip == NULL || pBatchList->dst == NULL) { - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - destroyCmsg(pMsg); - tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - continue; - } + // if (pBatchList->ip == NULL || pBatchList->dst == NULL) { + // taosMemoryFree(pBatchList->ip); + // taosMemoryFree(pBatchList->dst); + // taosMemoryFree(pBatchList); + // destroyCmsg(pMsg); + // tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + // continue; + // } SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - destroyCmsg(pMsg); - continue; - } + // if (pBatch == NULL) { + // tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + // destroyCmsg(pMsg); + // continue; + // } pBatch->batchSize += pMsg->msg.contLen; pBatch->pList = pBatchList; @@ -2010,21 +2010,21 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); if (code != 0) { - taosMemoryFree(pBatch); - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - tError("failed to put batch list to cache, reason:%s", tstrerror(code)); - destroyCmsg(pMsg); - continue; + // taosMemoryFree(pBatch); + // taosMemoryFree(pBatchList->ip); + // taosMemoryFree(pBatchList->dst); + // taosMemoryFree(pBatchList); + // tError("failed to put batch list to cache, reason:%s", tstrerror(code)); + // destroyCmsg(pMsg); + // continue; } } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - destroyCmsg(pMsg); - continue; - } + // if (pBatch == NULL) { + // destroyCmsg(pMsg); + // continue; + // } QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->listq); @@ -2044,10 +2044,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { pBatch->wLen += 1; } else { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - destroyCmsg(pMsg); - continue; - } + // if (pBatch == NULL) { + // destroyCmsg(pMsg); + // continue; + // } QUEUE_INIT(&pBatch->wq); QUEUE_INIT(&pBatch->listq); From c4cf02148ae5e2eae3ebffde1098d0ab2503ef70 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Aug 2024 18:51:21 +0800 Subject: [PATCH 5/9] refactor errno code --- source/libs/transport/src/transCli.c | 153 ++++++++++++++------------- 1 file changed, 82 insertions(+), 71 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index eead34b98a..b37494a730 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1946,6 +1946,68 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { return batch; } +static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { + SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); + if (pBatchList == NULL) { + tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + QUEUE_INIT(&pBatchList->wq); + pBatchList->port = port; + pBatchList->connMax = 1; + pBatchList->connCnt = 0; + pBatchList->batchLenLimit = 0; + pBatchList->len += 1; + + pBatchList->ip = taosStrdup(ip); + pBatchList->dst = taosStrdup(key); + if (pBatchList->ip == NULL || pBatchList->dst == NULL) { + taosMemoryFree(pBatchList->ip); + taosMemoryFree(pBatchList->dst); + taosMemoryFree(pBatchList); + tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + *ppBatchList = pBatchList; + return 0; +} +static int32_t destroyBatchList(SCliBatchList* pList) { + if (pList == NULL) { + return 0; + } + while (!QUEUE_IS_EMPTY(&pList->wq)) { + queue* h = QUEUE_HEAD(&pList->wq); + QUEUE_REMOVE(h); + + SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); + cliDestroyBatch(pBatch); + } + taosMemoryFree(pList->ip); + taosMemoryFree(pList->dst); + taosMemoryFree(pList); + return 0; +} +static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* pMsg) { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + if (pBatch == NULL) { + tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, &pMsg->q); + pBatch->wLen += 1; + pBatch->batchSize = pMsg->msg.contLen; + pBatch->pList = pList; + + QUEUE_PUSH(&pList->wq, &pBatch->listq); + pList->len += 1; + + *ppBatch = pBatch; + return 0; +} static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { STrans* pInst = pThrd->pTransInst; int32_t code = 0; @@ -1972,69 +2034,31 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { size_t klen = strlen(key); SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - // if (pBatchList == NULL) { - // tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // destroyCmsg(pMsg); - // continue; - // } - QUEUE_INIT(&pBatchList->wq); - pBatchList->port = port; - pBatchList->connMax = pInst->connLimitNum; - pBatchList->connCnt = 0; + SCliBatchList* pBatchList = NULL; + code = createBatchList(&pBatchList, key, ip, port); + if (code != 0) { + destroyCmsg(pMsg); + continue; + } pBatchList->batchLenLimit = pInst->batchSize; - pBatchList->len += 1; - pBatchList->ip = taosStrdup(ip); - pBatchList->dst = taosStrdup(key); - // if (pBatchList->ip == NULL || pBatchList->dst == NULL) { - // taosMemoryFree(pBatchList->ip); - // taosMemoryFree(pBatchList->dst); - // taosMemoryFree(pBatchList); - // destroyCmsg(pMsg); - // tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // continue; - // } - - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - // if (pBatch == NULL) { - // tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // destroyCmsg(pMsg); - // continue; - // } - - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = pBatchList; - - QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); + SCliBatch* pBatch = NULL; + code = createBatch(&pBatch, pBatchList, pMsg); + if (code != 0) { + destroyBatchList(pBatchList); + destroyCmsg(pMsg); + continue; + } code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); if (code != 0) { - // taosMemoryFree(pBatch); - // taosMemoryFree(pBatchList->ip); - // taosMemoryFree(pBatchList->dst); - // taosMemoryFree(pBatchList); - // tError("failed to put batch list to cache, reason:%s", tstrerror(code)); - // destroyCmsg(pMsg); - // continue; + destroyBatchList(pBatchList); } } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - // if (pBatch == NULL) { - // destroyCmsg(pMsg); - // continue; - // } - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize = pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; + SCliBatch* pBatch = NULL; + code = createBatch(&pBatch, *ppBatchList, pMsg); + if (code != 0) cliDestroyBatch(pBatch); } else { queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); @@ -2043,22 +2067,9 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { pBatch->batchSize += pMsg->msg.contLen; pBatch->wLen += 1; } else { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - // if (pBatch == NULL) { - // destroyCmsg(pMsg); - // continue; - // } - - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; + SCliBatch* tBatch = NULL; + code = createBatch(&tBatch, *ppBatchList, pMsg); + if (code != 0) cliDestroyBatch(pBatch); } } } From d072e3f7dfebcb161bba8fac4a52979e29a9a8eb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Aug 2024 09:12:41 +0800 Subject: [PATCH 6/9] refactor errno code --- source/libs/transport/src/transCli.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b37494a730..258f3b6fdb 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1971,9 +1971,8 @@ static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, *ppBatchList = pBatchList; return 0; } -static int32_t destroyBatchList(SCliBatchList* pList) { +static void destroyBatchList(SCliBatchList* pList) { if (pList == NULL) { - return 0; } while (!QUEUE_IS_EMPTY(&pList->wq)) { queue* h = QUEUE_HEAD(&pList->wq); @@ -1985,7 +1984,6 @@ static int32_t destroyBatchList(SCliBatchList* pList) { taosMemoryFree(pList->ip); taosMemoryFree(pList->dst); taosMemoryFree(pList); - return 0; } static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* pMsg) { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); From 1a2e233d09db612e3faabc458dfe81a50bd42513 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Aug 2024 09:23:03 +0800 Subject: [PATCH 7/9] refactor errno code --- source/libs/transport/src/transCli.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 258f3b6fdb..91318c2972 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1973,6 +1973,7 @@ static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, } static void destroyBatchList(SCliBatchList* pList) { if (pList == NULL) { + return; } while (!QUEUE_IS_EMPTY(&pList->wq)) { queue* h = QUEUE_HEAD(&pList->wq); @@ -2056,7 +2057,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { SCliBatch* pBatch = NULL; code = createBatch(&pBatch, *ppBatchList, pMsg); - if (code != 0) cliDestroyBatch(pBatch); + if (code != 0) { + destroyCmsg(pMsg); + cliDestroyBatch(pBatch); + } } else { queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); @@ -2067,7 +2071,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { } else { SCliBatch* tBatch = NULL; code = createBatch(&tBatch, *ppBatchList, pMsg); - if (code != 0) cliDestroyBatch(pBatch); + if (code != 0) { + destroyCmsg(pMsg); + cliDestroyBatch(pBatch); + } } } } From 6a7cee7687a08a4bf96a08fa19ff659abe8c3c9f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 10 Aug 2024 18:55:03 +0800 Subject: [PATCH 8/9] refactor errno code --- source/libs/transport/src/transCli.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 91318c2972..2cc89a2609 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1161,7 +1161,7 @@ void cliSendBatch(SCliConn* pConn) { if (wb == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - goto _except; + goto _exception; } int i = 0; @@ -1177,7 +1177,7 @@ void cliSendBatch(SCliConn* pConn) { if (pMsg->pCont == NULL) { code = TSDB_CODE_OUT_OF_BUFFER; tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - goto _except; + goto _exception; } pMsg->contLen = 0; } @@ -1215,7 +1215,7 @@ void cliSendBatch(SCliConn* pConn) { if (req == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; tError("%s conn %p failed to send batch msg since:%s", CONN_GET_INST_LABEL(pConn), pConn, tstrerror(code)); - goto _except; + goto _exception; } req->data = pConn; tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, @@ -1224,13 +1224,13 @@ void cliSendBatch(SCliConn* pConn) { code = uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); if (code != 0) { tDebug("%s conn %p failed to to send batch msg since %s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(code)); - goto _except; + goto _exception; } taosMemoryFree(wb); return; -_except: +_exception: cliDestroyBatch(pBatch); taosMemoryFree(wb); pConn->pBatch = NULL; From 593ccf20547e0a4fcbae2b8addbc9db9c4995bdf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 12 Aug 2024 11:45:30 +0800 Subject: [PATCH 9/9] refactor code --- source/libs/transport/src/transCli.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2cc89a2609..6006d7af7a 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2073,7 +2073,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { code = createBatch(&tBatch, *ppBatchList, pMsg); if (code != 0) { destroyCmsg(pMsg); - cliDestroyBatch(pBatch); } } }