diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfd7630f35..86d6ef58ff 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -212,8 +212,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd); static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, NULL, cliHandleUpdate}; -/// static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, cliHandleQuit, cliHandleRelease, -/// NULL,cliHandleUpdate}; + +static void cliDealReq(queue* h, SCliThrd* pThrd); +static void cliBatchDealReq(queue* h, SCliThrd* pThrd); +static void (*cliDealFunc[])(queue* h, SCliThrd* pThrd) = {cliDealReq, cliBatchDealReq}; static FORCE_INLINE void destroyCmsg(void* cmsg); @@ -1695,7 +1697,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { tGTrace("%s conn %p ready", pTransInst->label, conn); } -static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { +static void cliDealReq(queue* wq, SCliThrd* pThrd) { int count = 0; while (!QUEUE_IS_EMPTY(wq)) { @@ -1709,7 +1711,6 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { continue; } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); - count++; } if (count >= 2) { @@ -1729,7 +1730,77 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); return batch; } +static void cliBuildBatch(SCliMsg* pMsg, queue* h, SCliThrd* pThrd) { + STrans* pInst = pThrd->pTransInst; + STransConnCtx* pCtx = pMsg->ctx; + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + size_t klen = strlen(key); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); + if (ppBatchList == NULL || *ppBatchList == NULL) { + SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); + QUEUE_INIT(&pBatchList->wq); + pBatchList->connMax = pInst->connLimitNum; + pBatchList->connCnt = 0; + pBatchList->batchLenLimit = pInst->batchSize; + pBatchList->len += 1; + + pBatchList->ip = taosStrdup(ip); + pBatchList->dst = taosStrdup(key); + pBatchList->port = port; + + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = pBatchList; + + QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); + + taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); + } else { + if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize = pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; + } + + queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); + SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); + if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { + QUEUE_PUSH(&pBatch->wq, h); + pBatch->batchSize += pMsg->msg.contLen; + pBatch->wLen += 1; + } else { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; + } + } + return; +} static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { STrans* pInst = pThrd->pTransInst; @@ -1746,75 +1817,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { } if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { - STransConnCtx* pCtx = pMsg->ctx; - - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - size_t klen = strlen(key); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); - if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); - QUEUE_INIT(&pBatchList->wq); - pBatchList->connMax = pInst->connLimitNum; - pBatchList->connCnt = 0; - pBatchList->batchLenLimit = pInst->batchSize; - pBatchList->len += 1; - - pBatchList->ip = taosStrdup(ip); - pBatchList->dst = taosStrdup(key); - pBatchList->port = port; - - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = pBatchList; - - QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - - taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); - } else { - if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize = pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; - - continue; - } - - queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); - SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { - QUEUE_PUSH(&pBatch->wq, h); - pBatch->batchSize += pMsg->msg.contLen; - pBatch->wLen += 1; - } else { - SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - QUEUE_INIT(&pBatch->wq); - QUEUE_INIT(&pBatch->listq); - - QUEUE_PUSH(&pBatch->wq, h); - pBatch->wLen += 1; - pBatch->batchSize += pMsg->msg.contLen; - pBatch->pList = *ppBatchList; - - QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); - (*ppBatchList)->len += 1; - } - } + cliBuildBatch(pMsg, h, pThrd); continue; } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); @@ -1847,12 +1850,7 @@ static void cliAsyncCb(uv_async_t* handle) { QUEUE_MOVE(&item->qmsg, &wq); taosThreadMutexUnlock(&item->mtx); - int8_t supportBatch = pTransInst->supportBatch; - if (supportBatch == 0) { - cliNoBatchDealReq(&wq, pThrd); - } else if (supportBatch == 1) { - cliBatchDealReq(&wq, pThrd); - } + cliDealFunc[pTransInst->supportBatch](&wq, pThrd); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); }