handle redirect
This commit is contained in:
parent
ed3753b32b
commit
aa8957bcc5
|
@ -513,7 +513,7 @@ static void allocConnRef(SCliConn* conn, bool update) {
|
||||||
}
|
}
|
||||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
if (conn->status == ConnInPool) {
|
if (conn->status == ConnInPool) {
|
||||||
assert(0);
|
// assert(0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SCliThrd* thrd = conn->hostThrd;
|
SCliThrd* thrd = conn->hostThrd;
|
||||||
|
@ -986,13 +986,13 @@ static void doDelayTask(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
char tbuf[256] = {0};
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
|
char tbuf[256] = {0};
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
tGTrace("%s retry to send msg to next node %dms later , use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst),
|
tGTrace("%s retry on next node, use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst), tbuf,
|
||||||
TRANS_RETRY_INTERVAL, tbuf, pCtx->retryCnt + 1, pCtx->retryLimit);
|
pCtx->retryCnt + 1, pCtx->retryLimit);
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
|
@ -1000,7 +1000,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cliUpdateRetryLimit(int8_t* val, int8_t exp, int8_t newVal) {
|
void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
|
||||||
if (*val != exp) {
|
if (*val != exp) {
|
||||||
*val = newVal;
|
*val = newVal;
|
||||||
}
|
}
|
||||||
|
@ -1025,7 +1025,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
pMsg->sent = 0;
|
pMsg->sent = 0;
|
||||||
pCtx->retryCnt += 1;
|
pCtx->retryCnt += 1;
|
||||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
transUnrefCliHandle(pConn);
|
transUnrefCliHandle(pConn);
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
|
@ -1033,7 +1033,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
addConnToPool(pThrd->pool, pConn);
|
addConnToPool(pThrd->pool, pConn);
|
||||||
if (pResp->contLen == 0) {
|
if (pResp->contLen == 0) {
|
||||||
|
|
|
@ -422,6 +422,7 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
|
||||||
transUnrefSrvHandle(pConn);
|
transUnrefSrvHandle(pConn);
|
||||||
} else {
|
} else {
|
||||||
pHead->msgType = pMsg->msgType;
|
pHead->msgType = pMsg->msgType;
|
||||||
|
if (pHead->msgType == 0) pHead->msgType = pConn->inType + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue