diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 8cc37910fd..3083e70574 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -85,6 +85,11 @@ typedef struct SRpcInit { int32_t retryLimit; // retry limit int32_t retryInterval; // retry interval ms + int32_t retryMinInterval; // retry init interval + int32_t retryStepFactor; // retry interval factor + int32_t retryMaxInterval; // retry max interval + int32_t retryMaxTimouet; + int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int8_t encryption; // encrypt or not diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index ac54749ae1..1102ddd259 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -146,6 +146,15 @@ typedef struct { SCvtAddr cvtAddr; bool setMaxRetry; + int32_t retryMinInterval; + int32_t retryMaxInterval; + int32_t retryStepFactor; + int32_t retryMaxTimeout; + int64_t retryInitTimestamp; + int64_t retryNextInterval; + bool retryInit; + int32_t retryStep; + int hThrdIdx; } STransConnCtx; diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index c8a56081cc..833937aa41 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -52,6 +52,11 @@ typedef struct { int32_t retryLimit; // retry limit int32_t retryInterval; // retry interval ms + int32_t retryMinInterval; // retry init interval + int32_t retryStepFactor; // retry interval factor + int32_t retryMaxInterval; // retry max interval + int32_t retryMaxTimouet; + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 94bc128de9..415a8766e3 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -51,6 +51,11 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->retryLimit = pInit->retryLimit; pRpc->retryInterval = pInit->retryInterval; + pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval + pRpc->retryStepFactor = pInit->retryStepFactor; + pRpc->retryMaxInterval = pInit->retryMaxInterval; + pRpc->retryMaxTimouet = pInit->retryMaxTimouet; + // register callback handle pRpc->cfp = pInit->cfp; pRpc->retry = pInit->rfp; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4fb00b1a6d..7581611654 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1371,7 +1371,8 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; arg->param2 = pThrd; - transDQSched(pThrd->delayQueue, doDelayTask, arg, pTransInst->retryInterval); + + transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); } FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { @@ -1419,18 +1420,47 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { int32_t code = pResp->code; bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; + if (retry == true) { + if (!pCtx->retryInit) { + pCtx->retryMinInterval = pTransInst->retryMinInterval; + pCtx->retryMaxInterval = pTransInst->retryMaxInterval; + pCtx->retryStepFactor = pTransInst->retryStepFactor; + pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet; + pCtx->retryInit = true; + pCtx->retryStep = 1; + pCtx->retryInitTimestamp = taosGetTimestampMs(); + pCtx->retryNextInterval = pCtx->retryMinInterval; + } else { + pCtx->retryStep++; + int64_t factor = 1; + for (int i = 0; i < pCtx->retryStep - 1; i++) { + factor *= pCtx->retryStepFactor; + } + + pCtx->retryNextInterval = factor * pCtx->retryMinInterval; + if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) { + pCtx->retryNextInterval = pCtx->retryMaxInterval; + } + } + + if (taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) { + retry = false; + } + } + if (retry) { pMsg->sent = 0; pCtx->retryCnt += 1; + if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3); - if (pCtx->retryCnt < pCtx->retryLimit) { - transUnrefCliHandle(pConn); - EPSET_FORWARD_INUSE(&pCtx->epSet); - transFreeMsg(pResp->pCont); - cliSchedMsgToNextNode(pMsg, pThrd); - return -1; - } + // if (pCtx->retryCnt < pCtx->retryLimit) { + transUnrefCliHandle(pConn); + EPSET_FORWARD_INUSE(&pCtx->epSet); + transFreeMsg(pResp->pCont); + cliSchedMsgToNextNode(pMsg, pThrd); + return -1; + //} } else { cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit); if (pCtx->retryCnt < pCtx->retryLimit) {