handle except

This commit is contained in:
yihaoDeng 2022-06-23 20:51:09 +08:00
parent 23d1e55718
commit ffd105d0e0
2 changed files with 32 additions and 29 deletions

View File

@ -393,7 +393,6 @@ void cliHandleExcept(SCliConn* pConn) {
transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; transMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; transMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0;
transMsg.info.ahandle = NULL; transMsg.info.ahandle = NULL;
transMsg.info.handle = pConn;
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.info.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
@ -987,10 +986,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
if (pCtx->retryCount == 0) { if (pCtx->retryCount == 0) {
pCtx->origEpSet = pCtx->epSet; pCtx->origEpSet = pCtx->epSet;
} }
/* /*
* upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL * upper layer handle retry if code equal TSDB_CODE_RPC_NETWORK_UNAVAIL
*/ */
/*
* no retry
* 1. query conn 2. rpc thread already receive quit msg
*
*/
if (CONN_NO_PERSIST_BY_APP(pConn) && pThrd->quit == false) { if (CONN_NO_PERSIST_BY_APP(pConn) && pThrd->quit == false) {
tmsg_t msgType = pCtx->msgType; tmsg_t msgType = pCtx->msgType;
if ((pTransInst->retry != NULL && pEpSet->numOfEps > 1 && (pTransInst->retry(pResp->code))) || if ((pTransInst->retry != NULL && pEpSet->numOfEps > 1 && (pTransInst->retry(pResp->code))) ||
@ -1014,31 +1017,31 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
return -1; return -1;
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
transPrintEpSet(&pCtx->epSet);
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
} else {
SEpSet epSet = {0};
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = epSet;
transPrintEpSet(&pCtx->epSet);
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
}
if (pConn->status != ConnInPool) {
addConnToPool(pThrd->pool, pConn);
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
return -1;
} }
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
transPrintEpSet(&pCtx->epSet);
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
} else {
SEpSet epSet = {0};
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = epSet;
transPrintEpSet(&pCtx->epSet);
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
}
if (pConn->status != ConnInPool) {
addConnToPool(pThrd->pool, pConn);
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
return -1;
} }
} }
} }

View File

@ -455,16 +455,16 @@ void transPrintEpSet(SEpSet* pEpSet) {
return; return;
} }
char buf[512] = {0}; char buf[512] = {0};
int len = snprintf(buf, sizeof(buf), "epset:{ "); int len = snprintf(buf, sizeof(buf), "epset:{");
for (int i = 0; i < pEpSet->numOfEps; i++) { for (int i = 0; i < pEpSet->numOfEps; i++) {
if (i == pEpSet->numOfEps - 1) { if (i == pEpSet->numOfEps - 1) {
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} else { } else {
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
} }
} }
len += snprintf(buf + len, sizeof(buf) - len, "}"); len += snprintf(buf + len, sizeof(buf) - len, "}");
tTrace("%s, inUse: %d", buf, pEpSet->inUse); tTrace("%s, inUse:%d", buf, pEpSet->inUse);
} }
bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) { if (a->numOfEps != b->numOfEps || a->inUse != b->inUse) {