diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 468d9d9b50..398ce32b17 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -224,8 +224,6 @@ static void cliRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); -SCliBatch* cliGetHeadFromList(SCliBatchList* pList); - static void destroyCliConnQTable(SCliConn* conn); static void cliHandleException(SCliConn* conn); @@ -1299,8 +1297,8 @@ static void cliHandleException(SCliConn* conn) { if (conn->registered) { int8_t ref = transGetRefCount(conn); if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { -// tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, -// conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); + // tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, + // conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); uv_close((uv_handle_t*)conn->stream, cliDestroy); } } @@ -2124,144 +2122,7 @@ static void cliDoReq(queue* wq, SCliThrd* pThrd) { tTrace("cli process batch size:%d", count); } } -SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { - if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) { - return NULL; - } - queue* hr = QUEUE_HEAD(&pList->wq); - QUEUE_REMOVE(hr); - pList->sending += 1; - pList->len -= 1; - - SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); - return batch; -} -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq); - -static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port); - -static void destroyBatchList(SCliBatchList* pList); -static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { - int32_t code = 0; - STrans* pInst = pThrd->pInst; - SReqCtx* pCtx = pReq->ctx; - - char* ip = EPSET_GET_INUSE_IP(pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - size_t klen = strlen(key); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); - if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = NULL; - code = createBatchList(&pBatchList, key, ip, port); - if (code != 0) { - destroyReq(pReq); - return; - } - - pBatchList->batchLenLimit = pInst->shareConnLimit; - - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, pBatchList, pReq); - if (code != 0) { - destroyBatchList(pBatchList); - destroyReq(pReq); - return; - } - - code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); - if (code != 0) { - destroyBatchList(pBatchList); - } - } else { - if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - cliDestroyBatch(pBatch); - } - } else { - queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); - SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->shareConnLimit + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { - QUEUE_PUSH(&pBatch->wq, h); - pBatch->shareConnLimit += pReq->msg.contLen; - pBatch->wLen += 1; - } else { - SCliBatch* tBatch = NULL; - code = createBatch(&tBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - } - } - } - } - return; -} -static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { - SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - if (pBatchList == NULL) { - tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - QUEUE_INIT(&pBatchList->wq); - pBatchList->port = port; - pBatchList->connMax = 1; - pBatchList->connCnt = 0; - pBatchList->batchLenLimit = 0; - pBatchList->len += 1; - - pBatchList->ip = taosStrdup(ip); - pBatchList->dst = taosStrdup(key); - if (pBatchList->ip == NULL || pBatchList->dst == NULL) { - taosMemoryFree(pBatchList->ip); - taosMemoryFree(pBatchList->dst); - taosMemoryFree(pBatchList); - tError("failed to create batch list since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - *ppBatchList = pBatchList; - return 0; -} -static void destroyBatchList(SCliBatchList* pList) { - if (pList == NULL) { - return; - } - while (!QUEUE_IS_EMPTY(&pList->wq)) { - queue* h = QUEUE_HEAD(&pList->wq); - QUEUE_REMOVE(h); - - SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); - cliDestroyBatch(pBatch); - } - taosMemoryFree(pList->ip); - taosMemoryFree(pList->dst); - taosMemoryFree(pList); -} -static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq) { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - if (pBatch == NULL) { - tError("failed to create batch since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - return terrno; - } - - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, &pReq->q); - pBatch->wLen += 1; - pBatch->shareConnLimit = pReq->msg.contLen; - pBatch->pList = pList; - - QUEUE_PUSH(&pList->wq, &pBatch->listq); - pList->len += 1; - - *ppBatch = pBatch; - return 0; -} static void cliDoBatchReq(queue* wq, SCliThrd* pThrd) { return cliDoReq(wq, pThrd); } static void cliAsyncCb(uv_async_t* handle) {