From 6d3b8c059462f96d7ff9a6c21b3e10fd5c35ce99 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Sep 2024 09:04:56 +0800 Subject: [PATCH] opt transport --- source/libs/transport/inc/transComm.h | 20 ++++++--------- source/libs/transport/src/transCli.c | 35 +++++++++++++++------------ 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c8ce8e5f45..e75cd51fc0 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -139,30 +139,26 @@ typedef struct SCvtAddr { typedef struct { SEpSet epSet; // ip list provided by app SEpSet origEpSet; - void* ahandle; // handle provided by app - tmsg_t msgType; // message type - int8_t connType; // connection type cli/srv + void* ahandle; // handle provided by app + tmsg_t msgType; // message type STransCtx userCtx; // STransMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API STransSyncMsg* pSyncMsg; // for syncchronous with timeout API int64_t syncMsgRef; - SCvtAddr cvtAddr; + SCvtAddr* pCvtAddr; + int64_t retryInitTimestamp; + int64_t retryNextInterval; + int64_t retryMaxTimeout; int32_t retryMinInterval; int32_t retryMaxInterval; int32_t retryStepFactor; - int64_t retryMaxTimeout; - int64_t retryInitTimestamp; - int64_t retryNextInterval; - bool retryInit; int32_t retryStep; - int8_t epsetRetryCnt; int32_t retryCode; - - void* task; - int hThrdIdx; + int8_t retryInit; + int8_t epsetRetryCnt; } SReqCtx; #pragma pack(push, 1) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 16afa3628c..fd17e163b8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -120,17 +120,13 @@ typedef struct { typedef struct SCliReq { SReqCtx* ctx; - STransMsg msg; queue q; STransMsgType type; + uint64_t st; + int64_t seq; + int32_t sent; //(0: no send, 1: alread sent) + STransMsg msg; - // int64_t refId; - uint64_t st; - int sent; //(0: no send, 1: alread sent) - queue seqq; - int64_t seq; - - queue qlist; } SCliReq; typedef struct SCliThrd { @@ -152,7 +148,7 @@ typedef struct SCliThrd { void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; - SCvtAddr cvtAddr; + SCvtAddr* pCvtAddr; SHashObj* failFastCache; SHashObj* batchCache; @@ -1474,12 +1470,18 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) { static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { return; } static void cliHandleUpdate(SCliThrd* pThrd, SCliReq* pReq) { SReqCtx* pCtx = pReq->ctx; - pThrd->cvtAddr = pCtx->cvtAddr; + if (pThrd->pCvtAddr != NULL) { + taosMemoryFreeClear(pThrd->pCvtAddr); + } + pThrd->pCvtAddr = pCtx->pCvtAddr; destroyReq(pReq); return; } FORCE_INLINE int32_t cliMayCvtFqdnToIp(SEpSet* pEpSet, const SCvtAddr* pCvtAddr) { + if (pCvtAddr == NULL) { + return 0; + } if (pCvtAddr->cvt == false) { if (EPSET_IS_VALID(pEpSet)) { return 0; @@ -2001,7 +2003,7 @@ int32_t initCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { if (pReq->ctx == NULL) { return 0; } - return cliMayCvtFqdnToIp(&pReq->ctx->epSet, &pThrd->cvtAddr); + return cliMayCvtFqdnToIp(&pReq->ctx->epSet, pThrd->pCvtAddr); } int32_t notifyExceptCb(void* thrd, SCliReq* pReq, STransMsg* pResp) { SCliThrd* pThrd = thrd; @@ -2201,6 +2203,8 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosHashCleanup(pThrd->pIdConnTable); + taosMemoryFree(pThrd->pCvtAddr); + taosMemoryFree(pThrd); } @@ -2426,7 +2430,7 @@ void cliRetryMayInitCtx(STrans* pInst, SCliReq* pReq) { pCtx->retryInitTimestamp = taosGetTimestampMs(); pCtx->retryNextInterval = pCtx->retryMinInterval; pCtx->retryStep = 0; - pCtx->retryInit = true; + pCtx->retryInit = 1; pCtx->retryCode = TSDB_CODE_SUCCESS; pReq->msg.info.handle = 0; } @@ -2756,7 +2760,6 @@ static int32_t transInitMsg(void* pInstRef, const SEpSet* pEpSet, STransMsg* pRe pCliReq->msg = *pReq; pCliReq->st = taosGetTimestampUs(); pCliReq->type = Normal; - QUEUE_INIT(&pCliReq->seqq); *pCliMsg = pCliReq; @@ -3023,7 +3026,6 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq pCliReq->msg = *pReq; pCliReq->st = taosGetTimestampUs(); pCliReq->type = Normal; - // pCliReq->refId = (int64_t)pInstRef; STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pInst), pThrd->pid, @@ -3083,10 +3085,12 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) { break; } - pCtx->cvtAddr = cvtAddr; + pCtx->pCvtAddr = (SCvtAddr*)taosMemoryCalloc(1, sizeof(SCvtAddr)); + memcpy(pCtx->pCvtAddr, &cvtAddr, sizeof(SCvtAddr)); SCliReq* pReq = taosMemoryCalloc(1, sizeof(SCliReq)); if (pReq == NULL) { + taosMemoryFree(pCtx->pCvtAddr); taosMemoryFree(pCtx); code = terrno; break; @@ -3099,6 +3103,7 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) { tDebug("%s update epset at thread:%08" PRId64, pInst->label, thrd->pid); if ((code = transAsyncSend(thrd->asyncPool, &(pReq->q))) != 0) { + taosMemoryFree(pCtx->pCvtAddr); destroyReq(pReq); if (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT) { code = TSDB_CODE_RPC_MODULE_QUIT;