Merge pull request #12041 from taosdata/fix/retry_bug
fix(retry): fix retry count
This commit is contained in:
commit
27798de8db
|
@ -254,8 +254,17 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
|
||||||
|
|
||||||
epSet.eps[i].port = htons(epSet.eps[i].port);
|
epSet.eps[i].port = htons(epSet.eps[i].port);
|
||||||
}
|
}
|
||||||
|
SRpcMsg resp;
|
||||||
|
SMEpSet msg = {.epSet = epSet};
|
||||||
|
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
|
||||||
|
resp.pCont = rpcMallocCont(len);
|
||||||
|
resp.contLen = len;
|
||||||
|
tSerializeSMEpSet(resp.pCont, len, &msg);
|
||||||
|
|
||||||
rpcSendRedirectRsp(pReq->handle, &epSet);
|
resp.code = TSDB_CODE_RPC_REDIRECT;
|
||||||
|
resp.handle = pReq->handle;
|
||||||
|
resp.refId = pReq->refId;
|
||||||
|
rpcSendResponse(&resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
|
static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) {
|
||||||
|
|
|
@ -101,18 +101,8 @@ void* rpcReallocCont(void* ptr, int contLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
|
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
|
||||||
SRpcMsg rpcMsg;
|
// deprecated api
|
||||||
memset(&rpcMsg, 0, sizeof(rpcMsg));
|
assert(0);
|
||||||
|
|
||||||
SMEpSet msg = {.epSet = *pEpSet};
|
|
||||||
int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
|
|
||||||
rpcMsg.pCont = rpcMallocCont(len);
|
|
||||||
tSerializeSMEpSet(rpcMsg.pCont, len, &msg);
|
|
||||||
|
|
||||||
rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
|
|
||||||
rpcMsg.handle = thandle;
|
|
||||||
|
|
||||||
rpcSendResponse(&rpcMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
|
||||||
|
|
|
@ -914,8 +914,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
tmsg_t msgType = pCtx->msgType;
|
tmsg_t msgType = pCtx->msgType;
|
||||||
if ((pTransInst->retry != NULL && (pTransInst->retry(pResp->code))) ||
|
if ((pTransInst->retry != NULL && (pTransInst->retry(pResp->code))) ||
|
||||||
((pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgType == TDMT_MND_CONNECT)) {
|
((pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) && msgType == TDMT_MND_CONNECT)) {
|
||||||
pCtx->retryCount += 1;
|
pMsg->sent = 0;
|
||||||
pMsg->st = taosGetTimestampUs();
|
pMsg->st = taosGetTimestampUs();
|
||||||
|
pCtx->retryCount += 1;
|
||||||
if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (msgType == TDMT_MND_CONNECT && pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
if (pCtx->retryCount < pEpSet->numOfEps) {
|
if (pCtx->retryCount < pEpSet->numOfEps) {
|
||||||
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
|
||||||
|
@ -936,12 +937,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
tDeserializeSMEpSet(pResp->pCont, pResp->contLen, &emsg);
|
||||||
pCtx->epSet = emsg.epSet;
|
pCtx->epSet = emsg.epSet;
|
||||||
}
|
}
|
||||||
|
addConnToPool(pThrd, pConn);
|
||||||
|
tTrace("use remote epset, current in use: %d, retry count%d, try limit: %d", pEpSet->inUse, pCtx->retryCount + 1,
|
||||||
|
TRANS_RETRY_COUNT_LIMIT);
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
arg->param2 = pThrd;
|
arg->param2 = pThrd;
|
||||||
|
|
||||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
addConnToPool(pThrd, pConn);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue