feat: refactor rpc quit
This commit is contained in:
parent
ef50435851
commit
720645800c
|
@ -1014,42 +1014,36 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
|
||||||
if (pCtx->retryCnt == 0) {
|
|
||||||
pCtx->origEpSet = pCtx->epSet;
|
|
||||||
}
|
|
||||||
/*
|
/*
|
||||||
* no retry
|
* no retry
|
||||||
* 1. query conn
|
* 1. query conn
|
||||||
* 2. rpc thread already receive quit msg
|
* 2. rpc thread already receive quit msg
|
||||||
*/
|
*/
|
||||||
int32_t code = pResp->code;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
if (CONN_NO_PERSIST_BY_APP(pConn)) {
|
int32_t code = pResp->code;
|
||||||
if (pTransInst->retry != NULL && pTransInst->retry(code)) {
|
if (pTransInst->retry != NULL && pTransInst->retry(code)) {
|
||||||
pMsg->sent = 0;
|
pMsg->sent = 0;
|
||||||
pCtx->retryCnt += 1;
|
pCtx->retryCnt += 1;
|
||||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
|
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
||||||
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
|
addConnToPool(pThrd->pool, pConn);
|
||||||
|
if (pResp->contLen == 0) {
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
} else {
|
||||||
return -1;
|
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet);
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
|
||||||
addConnToPool(pThrd->pool, pConn);
|
|
||||||
if (pResp->contLen == 0) {
|
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
|
||||||
} else {
|
|
||||||
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet);
|
|
||||||
}
|
|
||||||
transFreeMsg(pResp->pCont);
|
|
||||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
transFreeMsg(pResp->pCont);
|
||||||
|
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1185,6 +1179,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
|
||||||
|
|
||||||
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx));
|
||||||
pCtx->epSet = *pEpSet;
|
pCtx->epSet = *pEpSet;
|
||||||
|
pCtx->origEpSet = *pEpSet;
|
||||||
pCtx->ahandle = pReq->info.ahandle;
|
pCtx->ahandle = pReq->info.ahandle;
|
||||||
pCtx->msgType = pReq->msgType;
|
pCtx->msgType = pReq->msgType;
|
||||||
pCtx->pSem = sem;
|
pCtx->pSem = sem;
|
||||||
|
|
Loading…
Reference in New Issue