refactor code
This commit is contained in:
parent
80188f9213
commit
3378ff0fb8
|
@ -151,8 +151,8 @@ typedef struct {
|
||||||
int64_t retryNextInterval;
|
int64_t retryNextInterval;
|
||||||
bool retryInit;
|
bool retryInit;
|
||||||
int32_t retryStep;
|
int32_t retryStep;
|
||||||
|
int8_t epsetRetryCnt;
|
||||||
int8_t epsetRetryCnt;
|
int32_t retryCode;
|
||||||
|
|
||||||
int hThrdIdx;
|
int hThrdIdx;
|
||||||
} STransConnCtx;
|
} STransConnCtx;
|
||||||
|
|
|
@ -1020,7 +1020,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
|
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
tDebug("current epset %s", tbuf);
|
|
||||||
|
|
||||||
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
if (!EPSET_IS_VALID(&pCtx->epSet)) {
|
||||||
tError("invalid epset");
|
tError("invalid epset");
|
||||||
|
@ -1500,34 +1499,46 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
pCtx->retryNextInterval = pCtx->retryMinInterval;
|
pCtx->retryNextInterval = pCtx->retryMinInterval;
|
||||||
pCtx->retryStep = 0;
|
pCtx->retryStep = 0;
|
||||||
pCtx->retryInit = true;
|
pCtx->retryInit = true;
|
||||||
|
pCtx->retryCode = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
|
if (-1 != pCtx->retryMaxTimeout && taosGetTimestampMs() - pCtx->retryInitTimestamp >= pCtx->retryMaxTimeout) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// code, msgType
|
||||||
|
|
||||||
|
// A: epset, leader, not self
|
||||||
|
// B: epset, not know leader
|
||||||
|
// C: no epset, leader but not serivce
|
||||||
|
|
||||||
bool noDelay = false;
|
bool noDelay = false;
|
||||||
if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
|
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
|
||||||
noDelay = cliResetEpset(pCtx, pResp, false);
|
noDelay = cliResetEpset(pCtx, pResp, false);
|
||||||
transFreeMsg(pResp->pCont);
|
transFreeMsg(pResp->pCont);
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
|
} else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR ||
|
||||||
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) {
|
code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) {
|
||||||
tDebug("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
|
tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen);
|
||||||
noDelay = cliResetEpset(pCtx, pResp, true);
|
noDelay = cliResetEpset(pCtx, pResp, true);
|
||||||
transFreeMsg(pResp->pCont);
|
transFreeMsg(pResp->pCont);
|
||||||
addConnToPool(pThrd->pool, pConn);
|
addConnToPool(pThrd->pool, pConn);
|
||||||
} else if (code == TSDB_CODE_SYN_RESTORING) {
|
} else if (code == TSDB_CODE_SYN_RESTORING) {
|
||||||
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
|
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
|
||||||
noDelay = cliResetEpset(pCtx, pResp, false);
|
noDelay = cliResetEpset(pCtx, pResp, false);
|
||||||
addConnToPool(pThrd->pool, pConn);
|
addConnToPool(pThrd->pool, pConn);
|
||||||
transFreeMsg(pResp->pCont);
|
transFreeMsg(pResp->pCont);
|
||||||
} else {
|
} else {
|
||||||
tDebug("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
|
tTrace("code str %s, contlen:%d 0", tstrerror(code), pResp->contLen);
|
||||||
noDelay = cliResetEpset(pCtx, pResp, false);
|
noDelay = cliResetEpset(pCtx, pResp, false);
|
||||||
addConnToPool(pThrd->pool, pConn);
|
addConnToPool(pThrd->pool, pConn);
|
||||||
transFreeMsg(pResp->pCont);
|
transFreeMsg(pResp->pCont);
|
||||||
}
|
}
|
||||||
|
if (code != TSDB_CODE_RPC_BROKEN_LINK && code != TSDB_CODE_RPC_NETWORK_UNAVAIL && code != TSDB_CODE_SUCCESS) {
|
||||||
|
// save one internal code
|
||||||
|
pCtx->retryCode = code;
|
||||||
|
}
|
||||||
|
|
||||||
if (noDelay == false) {
|
if (noDelay == false) {
|
||||||
pCtx->epsetRetryCnt = 1;
|
pCtx->epsetRetryCnt = 1;
|
||||||
|
@ -1556,29 +1567,36 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
|
|
||||||
if (pMsg == NULL || pMsg->ctx == NULL) {
|
if (pMsg == NULL || pMsg->ctx == NULL) {
|
||||||
tDebug("%s conn %p handle resp", pTransInst->label, pConn);
|
tTrace("%s conn %p handle resp", pTransInst->label, pConn);
|
||||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
int32_t code = pResp->code;
|
|
||||||
|
|
||||||
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
|
bool retry = cliGenRetryRule(pConn, pResp, pMsg);
|
||||||
if (retry == true) {
|
if (retry == true) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
STraceId* trace = &pResp->info.traceId;
|
|
||||||
|
|
||||||
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
|
if (pCtx->retryCode != TSDB_CODE_SUCCESS) {
|
||||||
|
int32_t code = pResp->code;
|
||||||
|
// return internal code app
|
||||||
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
||||||
|
pResp->code = pCtx->retryCode;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
STraceId* trace = &pResp->info.traceId;
|
||||||
|
bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet);
|
||||||
if (hasEpSet) {
|
if (hasEpSet) {
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
tGDebug("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx->pSem != NULL) {
|
if (pCtx->pSem != NULL) {
|
||||||
tGDebug("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
if (pCtx->pRsp == NULL) {
|
if (pCtx->pRsp == NULL) {
|
||||||
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1587,11 +1605,11 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
tsem_post(pCtx->pSem);
|
tsem_post(pCtx->pSem);
|
||||||
pCtx->pRsp = NULL;
|
pCtx->pRsp = NULL;
|
||||||
} else {
|
} else {
|
||||||
tGDebug("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||||
if (retry == false && hasEpSet == true) {
|
if (retry == false && hasEpSet == true) {
|
||||||
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
|
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
|
||||||
} else {
|
} else {
|
||||||
if (!cliIsEpsetUpdated(code, pCtx)) {
|
if (!cliIsEpsetUpdated(pResp->code, pCtx)) {
|
||||||
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
pTransInst->cfp(pTransInst->parent, pResp, NULL);
|
||||||
} else {
|
} else {
|
||||||
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
|
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
|
||||||
|
|
Loading…
Reference in New Issue