From a0abe6743271db820b10189182233659ab553220 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 23 May 2024 06:49:24 +0000 Subject: [PATCH] refactor tranport --- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/transCli.c | 105 ++++++++++++----------- 2 files changed, 57 insertions(+), 49 deletions(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 7853e25cff..232210b53b 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -69,6 +69,7 @@ typedef struct { int8_t connLimitLock; // 0: no lock. 1. lock int8_t supportBatch; // 0: no batch, 1: support batch int32_t batchSize; + int8_t optBatchFetch; int32_t timeToGetConn; int index; void* parent; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 023cb16b8b..f6e4c4ecf9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1213,23 +1213,67 @@ static void cliDestroyBatch(SCliBatch* pBatch) { p->sending -= 1; taosMemoryFree(pBatch); } + +static void cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { + STrans* pTransInst = pThrd->pTransInst; + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip); + if (ipaddr == 0xffffffff) { + cliResetConnTimer(conn); + cliHandleFastFail(conn, -1); + return; + } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = ipaddr; + addr.sin_port = (uint16_t)htons(port); + + tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); + int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); + if (fd == -1) { + tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + cliHandleFastFail(conn, -1); + return; + } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); + if (ret != 0) { + tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); + return; + } + ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); + if (ret != 0) { + tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); + cliHandleFastFail(conn, -1); + return; + } + + ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + if (ret != 0) { + cliResetConnTimer(conn); + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + cliHandleFastFail(conn, -1); + return; + } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); + return; +} static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { + return; + } + if (pThrd->quit == true) { cliDestroyBatch(pBatch); return; } - if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { - return; - } STrans* pTransInst = pThrd->pTransInst; SCliBatchList* pList = pBatch->pList; - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); - bool exceed = false; - SCliConn* conn = getConnFromPool(pThrd, key, &exceed); + SCliConn* conn = getConnFromPool(pThrd, pList->dst, &exceed); if (conn == NULL && exceed) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen, @@ -1241,48 +1285,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn = cliCreateConn(pThrd); conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); - - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); - if (ipaddr == 0xffffffff) { - cliResetConnTimer(conn); - cliHandleFastFail(conn, -1); - return; - } - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons(pList->port); - - tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); - if (fd == -1) { - tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - cliHandleFastFail(conn, -1); - return; - } - int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); - if (ret != 0) { - tError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); - cliHandleFastFail(conn, -1); - return; - } - ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); - if (ret != 0) { - tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); - cliHandleFastFail(conn, -1); - return; - } - - ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); - if (ret != 0) { - cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, -1); - return; - } - uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); - return; + return cliDoConn(pThrd, conn, pList->ip, pList->port); } conn->pBatch = pBatch; @@ -1803,6 +1806,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg->type == Normal) { + cliBuildBatch(pMsg, h, pThrd); + count++; + } if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { cliBuildBatch(pMsg, h, pThrd); continue;