diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index eead34b98a..b37494a730 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1946,6 +1946,68 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { return batch; } +static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { + SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); + if (pBatchList == NULL) { + tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + QUEUE_INIT(&pBatchList->wq); + pBatchList->port = port; + pBatchList->connMax = 1; + pBatchList->connCnt = 0; + pBatchList->batchLenLimit = 0; + pBatchList->len += 1; + + pBatchList->ip = taosStrdup(ip); + pBatchList->dst = taosStrdup(key); + if (pBatchList->ip == NULL || pBatchList->dst == NULL) { + taosMemoryFree(pBatchList->ip); + taosMemoryFree(pBatchList->dst); + taosMemoryFree(pBatchList); + tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + *ppBatchList = pBatchList; + return 0; +} +static int32_t destroyBatchList(SCliBatchList* pList) { + if (pList == NULL) { + return 0; + } + while (!QUEUE_IS_EMPTY(&pList->wq)) { + queue* h = QUEUE_HEAD(&pList->wq); + QUEUE_REMOVE(h); + + SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); + cliDestroyBatch(pBatch); + } + taosMemoryFree(pList->ip); + taosMemoryFree(pList->dst); + taosMemoryFree(pList); + return 0; +} +static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliMsg* pMsg) { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + if (pBatch == NULL) { + tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, &pMsg->q); + pBatch->wLen += 1; + pBatch->batchSize = pMsg->msg.contLen; + pBatch->pList = pList; + + QUEUE_PUSH(&pList->wq, &pBatch->listq); + pList->len += 1; + + *ppBatch = pBatch; + return 0; +} static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { STrans* pInst = pThrd->pTransInst; int32_t code = 0; @@ -1972,69 +2034,31 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { size_t klen = strlen(key); SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - // if (pBatchList == NULL) { - // tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // destroyCmsg(pMsg); - // continue; - // } - QUEUE_INIT(&pBatchList->wq); - pBatchList->port = port; - pBatchList->connMax = pInst->connLimitNum; - pBatchList->connCnt = 0; + SCliBatchList* pBatchList = NULL; + code = createBatchList(&pBatchList, key, ip, port); + if (code != 0) { + destroyCmsg(pMsg); + continue; + } pBatchList->batchLenLimit = pInst->batchSize; - pBatchList->len += 1; - pBatchList->ip = taosStrdup(ip); - pBatchList->dst = taosStrdup(key); - // if (pBatchList->ip == NULL || pBatchList->dst == NULL) { - // taosMemoryFree(pBatchList->ip); - // taosMemoryFree(pBatchList->dst); - // taosMemoryFree(pBatchList); - // destroyCmsg(pMsg); - // tError("failed to create batch list, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // continue; - // } - - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - // if (pBatch == NULL) { - // tError("failed to create batch, reason:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // destroyCmsg(pMsg); - // continue; - // } - - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = pBatchList; - - QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); + SCliBatch* pBatch = NULL; + code = createBatch(&pBatch, pBatchList, pMsg); + if (code != 0) { + destroyBatchList(pBatchList); + destroyCmsg(pMsg); + continue; + } code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); if (code != 0) { - // taosMemoryFree(pBatch); - // taosMemoryFree(pBatchList->ip); - // taosMemoryFree(pBatchList->dst); - // taosMemoryFree(pBatchList); - // tError("failed to put batch list to cache, reason:%s", tstrerror(code)); - // destroyCmsg(pMsg); - // continue; + destroyBatchList(pBatchList); } } else { if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - // if (pBatch == NULL) { - // destroyCmsg(pMsg); - // continue; - // } - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize = pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; + SCliBatch* pBatch = NULL; + code = createBatch(&pBatch, *ppBatchList, pMsg); + if (code != 0) cliDestroyBatch(pBatch); } else { queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); @@ -2043,22 +2067,9 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { pBatch->batchSize += pMsg->msg.contLen; pBatch->wLen += 1; } else { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - // if (pBatch == NULL) { - // destroyCmsg(pMsg); - // continue; - // } - - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; + SCliBatch* tBatch = NULL; + code = createBatch(&tBatch, *ppBatchList, pMsg); + if (code != 0) cliDestroyBatch(pBatch); } } }