update retry
This commit is contained in:
parent
a2b62cba42
commit
e54044de73
|
@ -272,17 +272,17 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
|
||||||
do { \
|
do { \
|
||||||
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
|
||||||
} while (0)
|
} while (0)
|
||||||
#define EPSET_DEBUG_STR(epSet, buf) \
|
#define EPSET_DEBUG_STR(epSet, tbuf) \
|
||||||
do { \
|
do { \
|
||||||
int len = snprintf(buf, sizeof(buf), "epset:{"); \
|
int len = snprintf(tbuf, sizeof(tbuf), "epset:{"); \
|
||||||
for (int i = 0; i < (epSet)->numOfEps; i++) { \
|
for (int i = 0; i < (epSet)->numOfEps; i++) { \
|
||||||
if (i == (epSet)->numOfEps - 1) { \
|
if (i == (epSet)->numOfEps - 1) { \
|
||||||
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
||||||
} else { \
|
} else { \
|
||||||
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
len += snprintf(tbuf + len, sizeof(tbuf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
|
||||||
} \
|
} \
|
||||||
} \
|
} \
|
||||||
len += snprintf(buf + len, sizeof(buf) - len, "}"); \
|
len += snprintf(tbuf + len, sizeof(tbuf) - len, "}, inUse:%d", (epSet)->inUse); \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
static void* cliWorkThread(void* arg);
|
static void* cliWorkThread(void* arg);
|
||||||
|
@ -989,9 +989,10 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
STraceId* trace = &pMsg->msg.info.traceId;
|
STraceId* trace = &pMsg->msg.info.traceId;
|
||||||
STransConnCtx* pCtx = pMsg->ctx;
|
STransConnCtx* pCtx = pMsg->ctx;
|
||||||
|
|
||||||
char buf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
EPSET_DEBUG_STR(&pCtx->epSet, buf);
|
EPSET_DEBUG_STR(&pCtx->epSet, tbuf);
|
||||||
tGTrace("%s %s, retryCnt:%d, limit:%d", transLabel(pThrd), buf, pCtx->retryCnt + 1, pCtx->retryLimit);
|
tGTrace("%s retry to send msg to next node %dms later , use %s, retryCnt:%d, limit:%d", transLabel(pThrd->pTransInst),
|
||||||
|
TRANS_RETRY_INTERVAL, tbuf, pCtx->retryCnt + 1, pCtx->retryLimit);
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
|
||||||
arg->param1 = pMsg;
|
arg->param1 = pMsg;
|
||||||
|
@ -1029,18 +1030,17 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
||||||
pMsg->sent = 0;
|
pMsg->sent = 0;
|
||||||
pCtx->retryCnt += 1;
|
pCtx->retryCnt += 1;
|
||||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||||
transUnrefCliHandle(pConn);
|
|
||||||
|
|
||||||
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
|
transUnrefCliHandle(pConn);
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
cliSchedMsgToNextNode(pMsg, pThrd);
|
cliSchedMsgToNextNode(pMsg, pThrd);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
addConnToPool(pThrd->pool, pConn);
|
|
||||||
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
|
||||||
if (pCtx->retryCnt < pCtx->retryLimit) {
|
if (pCtx->retryCnt < pCtx->retryLimit) {
|
||||||
|
addConnToPool(pThrd->pool, pConn);
|
||||||
if (pResp->contLen == 0) {
|
if (pResp->contLen == 0) {
|
||||||
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
EPSET_FORWARD_INUSE(&pCtx->epSet);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue