From 77c7ee264c11e22f826aac50537896e79a5522b0 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 16 May 2024 11:42:14 +0800 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 95 +++++++++++++--------------- 1 file changed, 45 insertions(+), 50 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 97a15c7fc8..419c037a5d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -175,6 +175,7 @@ static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); +void cliResetConnTimer(SCliConn* conn); static SCliConn* cliCreateConn(SCliThrd* thrd); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); @@ -367,10 +368,8 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { return false; } -void cliHandleResp(SCliConn* conn) { +void cliResetConnTimer(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; - if (conn->timer) { if (uv_is_active((uv_handle_t*)conn->timer)) { tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); @@ -380,6 +379,40 @@ void cliHandleResp(SCliConn* conn) { conn->timer->data = NULL; conn->timer = NULL; } +} +void cliHandleBatchResp(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + cliResetConnTimer(conn); + + STransMsgHead* pHead = NULL; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { + tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); + } + pHead->code = htonl(pHead->code); + pHead->msgLen = htonl(pHead->msgLen); + + STransMsg transMsg = {0}; + transMsg.contLen = transContLenFromMsg(pHead->msgLen); + transMsg.pCont = transContFromHead((char*)pHead); + transMsg.code = pHead->code; + transMsg.msgType = pHead->msgType; + transMsg.info.ahandle = NULL; + transMsg.info.traceId = pHead->traceId; + transMsg.info.hasEpSet = pHead->hasEpSet; + transMsg.info.cliVer = htonl(pHead->compatibilityVer); + + SCliMsg* pMsg = NULL; + STransConnCtx* pCtx = pMsg->ctx; + if (cliAppCb(conn, &transMsg, pMsg) != 0) { + return; + } +} +void cliHandleResp(SCliConn* conn) { + SCliThrd* pThrd = conn->hostThrd; + STrans* pTransInst = pThrd->pTransInst; + cliResetConnTimer(conn); STransMsgHead* pHead = NULL; @@ -578,11 +611,7 @@ void cliConnTimeout(uv_timer_t* handle) { tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); - uv_timer_stop(handle); - handle->data = NULL; - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, UV_ECANCELED); } @@ -757,12 +786,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { allocConnRef(conn, true); SCliThrd* thrd = conn->hostThrd; - if (conn->timer != NULL) { - uv_timer_stop(conn->timer); - taosArrayPush(thrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } + cliResetConnTimer(conn); if (T_REF_VAL_GET(conn) > 1) { transUnrefCliHandle(conn); } @@ -948,12 +972,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transDQCancel(pThrd->timeoutQueue, conn->task); conn->task = NULL; } - if (conn->timer != NULL) { - uv_timer_stop(conn->timer); - conn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - } + cliResetConnTimer(conn); if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { @@ -968,12 +987,7 @@ static void cliDestroy(uv_handle_t* handle) { } SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; - if (conn->timer != NULL) { - uv_timer_stop(conn->timer); - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer->data = NULL; - conn->timer = NULL; - } + cliResetConnTimer(conn); transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -1230,11 +1244,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); if (ipaddr == 0xffffffff) { - uv_timer_stop(conn->timer); - conn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliHandleFastFail(conn, -1); return; } @@ -1266,11 +1276,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - uv_timer_stop(conn->timer); - conn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, -1); return; @@ -1366,10 +1372,7 @@ void cliConnCb(uv_connect_t* req, int status) { if (pConn->timer == NULL) { timeout = true; } else { - uv_timer_stop(pConn->timer); - pConn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &pConn->timer); - pConn->timer = NULL; + cliResetConnTimer(pConn); } if (status != 0) { @@ -1641,11 +1644,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { - uv_timer_stop(conn->timer); - conn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliHandleExcept(conn); return; } @@ -1679,11 +1678,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - uv_timer_stop(conn->timer); - conn->timer->data = NULL; - taosArrayPush(pThrd->timerList, &conn->timer); - conn->timer = NULL; - + cliResetConnTimer(conn); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, ret); return;