add simple retry
This commit is contained in:
parent
7e010dcee9
commit
ba23ed2317
|
@ -85,6 +85,11 @@ typedef struct SRpcInit {
|
||||||
int32_t retryLimit; // retry limit
|
int32_t retryLimit; // retry limit
|
||||||
int32_t retryInterval; // retry interval ms
|
int32_t retryInterval; // retry interval ms
|
||||||
|
|
||||||
|
int32_t retryMinInterval; // retry init interval
|
||||||
|
int32_t retryStepFactor; // retry interval factor
|
||||||
|
int32_t retryMaxInterval; // retry max interval
|
||||||
|
int32_t retryMaxTimouet;
|
||||||
|
|
||||||
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
|
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
|
||||||
int8_t encryption; // encrypt or not
|
int8_t encryption; // encrypt or not
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,15 @@ typedef struct {
|
||||||
SCvtAddr cvtAddr;
|
SCvtAddr cvtAddr;
|
||||||
bool setMaxRetry;
|
bool setMaxRetry;
|
||||||
|
|
||||||
|
int32_t retryMinInterval;
|
||||||
|
int32_t retryMaxInterval;
|
||||||
|
int32_t retryStepFactor;
|
||||||
|
int32_t retryMaxTimeout;
|
||||||
|
int64_t retryInitTimestamp;
|
||||||
|
int64_t retryNextInterval;
|
||||||
|
bool retryInit;
|
||||||
|
int32_t retryStep;
|
||||||
|
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
} STransConnCtx;
|
} STransConnCtx;
|
||||||
|
|
||||||
|
|
|
@ -52,6 +52,11 @@ typedef struct {
|
||||||
int32_t retryLimit; // retry limit
|
int32_t retryLimit; // retry limit
|
||||||
int32_t retryInterval; // retry interval ms
|
int32_t retryInterval; // retry interval ms
|
||||||
|
|
||||||
|
int32_t retryMinInterval; // retry init interval
|
||||||
|
int32_t retryStepFactor; // retry interval factor
|
||||||
|
int32_t retryMaxInterval; // retry max interval
|
||||||
|
int32_t retryMaxTimouet;
|
||||||
|
|
||||||
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
|
||||||
bool (*retry)(int32_t code, tmsg_t msgType);
|
bool (*retry)(int32_t code, tmsg_t msgType);
|
||||||
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
bool (*startTimer)(int32_t code, tmsg_t msgType);
|
||||||
|
|
|
@ -51,6 +51,11 @@ void* rpcOpen(const SRpcInit* pInit) {
|
||||||
pRpc->retryLimit = pInit->retryLimit;
|
pRpc->retryLimit = pInit->retryLimit;
|
||||||
pRpc->retryInterval = pInit->retryInterval;
|
pRpc->retryInterval = pInit->retryInterval;
|
||||||
|
|
||||||
|
pRpc->retryMinInterval = pInit->retryMinInterval; // retry init interval
|
||||||
|
pRpc->retryStepFactor = pInit->retryStepFactor;
|
||||||
|
pRpc->retryMaxInterval = pInit->retryMaxInterval;
|
||||||
|
pRpc->retryMaxTimouet = pInit->retryMaxTimouet;
|
||||||
|
|
||||||
// register callback handle
|
// register callback handle
|
||||||
pRpc->cfp = pInit->cfp;
|
pRpc->cfp = pInit->cfp;
|
||||||
pRpc->retry = pInit->rfp;
|
pRpc->retry = pInit->rfp;
|
||||||
|
|
|
@ -1371,7 +1371,8 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, pTransInst->retryInterval);
|
|
||||||
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||||
|
@ -1419,18 +1420,47 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
int32_t code = pResp->code;
|
int32_t code = pResp->code;
|
||||||
|
|
||||||
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
|
bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false;
|
||||||
|
if (retry == true) {
|
||||||
|
if (!pCtx->retryInit) {
|
||||||
|
pCtx->retryMinInterval = pTransInst->retryMinInterval;
|
||||||
|
pCtx->retryMaxInterval = pTransInst->retryMaxInterval;
|
||||||
|
pCtx->retryStepFactor = pTransInst->retryStepFactor;
|
||||||
|
pCtx->retryMaxTimeout = pTransInst->retryMaxTimouet;
|
||||||
|
pCtx->retryInit = true;
|
||||||
|
pCtx->retryStep = 1;
|
||||||
|
pCtx->retryInitTimestamp = taosGetTimestampMs();
|
||||||
|
pCtx->retryNextInterval = pCtx->retryMinInterval;
|
||||||
|
} else {
|
||||||
|
pCtx->retryStep++;
|
||||||
|
int64_t factor = 1;
|
||||||
|
for (int i = 0; i < pCtx->retryStep - 1; i++) {
|
||||||
|
factor *= pCtx->retryStepFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCtx->retryNextInterval = factor * pCtx->retryMinInterval;
|
||||||
|
if (pCtx->retryNextInterval >= pCtx->retryMaxInterval) {
|
||||||
|
pCtx->retryNextInterval = pCtx->retryMaxInterval;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
|
||||||
|
retry = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (retry) {
|
if (retry) {
|
||||||
pMsg->sent = 0;
|
pMsg->sent = 0;
|
||||||
pCtx->retryCnt += 1;
|
pCtx->retryCnt += 1;
|
||||||
|
|
||||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
||||||
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
// if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
transFreeMsg(pResp->pCont);
|
transFreeMsg(pResp->pCont);
|
||||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
//}
|
||||||
} else {
|
} else {
|
||||||
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit);
|
cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
|
|
Loading…
Reference in New Issue