feat: refactor rpc quit

This commit is contained in:
yihaoDeng 2022-06-24 16:21:01 +08:00
parent 87f4f5364f
commit 445d7f2d90
2 changed files with 82 additions and 60 deletions

View File

@ -85,7 +85,8 @@ void closeTransporter(STscObj *pTscObj) {
}
static bool clientRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
code == TSDB_CODE_SYN_NOT_LEADER) {
return true;
} else {
return false;

View File

@ -111,6 +111,13 @@ static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle o
static void cliDestroy(uv_handle_t* handle);
static void cliSend(SCliConn* pConn);
static bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) {
if (code != 0) return false;
if (pCtx->retryCnt == 0) return false;
if (transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) return false;
return true;
}
void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr);
/*
* set TCP connection timeout per-socket level
@ -154,7 +161,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
destroyCmsg(pMsg);
}
}
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
@ -183,7 +189,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
int status = conn->status; \
uint64_t ahandle = head->ahandle; \
CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \
transClearBuffer(&conn->readBuf); \
@ -194,9 +199,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} \
destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \
if (status != ConnInPool) { \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
} \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
return; \
} \
} while (0)
@ -262,8 +265,25 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->info.persistHandle == 1)
#define REQUEST_RELEASE_HANDLE(cmsg) ((cmsg)->type == Release)
#define EPSET_GET_SIZE(epSet) (epSet)->numOfEps
#define EPSET_GET_INUSE_IP(epSet) ((epSet)->eps[(epSet)->inUse].fqdn)
#define EPSET_GET_INUSE_PORT(epSet) ((epSet)->eps[(epSet)->inUse].port)
#define EPSET_FORWARD_INUSE(epSet) \
do { \
(epSet)->inUse = (++((epSet)->inUse)) % ((epSet)->numOfEps); \
} while (0)
#define EPSET_DEBUG_STR(epSet, buf) \
do { \
int len = snprintf(buf, sizeof(buf), "epset:{"); \
for (int i = 0; i < (epSet)->numOfEps; i++) { \
if (i == (epSet)->numOfEps - 1) { \
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
} else { \
len += snprintf(buf + len, sizeof(buf) - len, "%d. %s:%d, ", i, (epSet)->eps[i].fqdn, (epSet)->eps[i].port); \
} \
} \
len += snprintf(buf + len, sizeof(buf) - len, "}"); \
} while (0);
static void* cliWorkThread(void* arg);
@ -492,6 +512,10 @@ static void allocConnRef(SCliConn* conn, bool update) {
conn->refId = exh->refId;
}
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
assert(0);
return;
}
SCliThrd* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
@ -505,7 +529,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
char key[128] = {0};
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("%s conn %p added to conn pool, read buf cap: %d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before
@ -751,9 +775,9 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
if (conn != NULL) {
tTrace("%s conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn);
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
} else {
tTrace("%s not found conn in conn pool %p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool);
}
return conn;
}
@ -773,7 +797,8 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
transPrintEpSet(&pCtx->epSet);
// transPrintEpSet(&pCtx->epSet);
SCliConn* conn = cliGetConn(pMsg, pThrd);
if (conn != NULL) {
transCtxMerge(&conn->ctx, &pCtx->appCtx);
@ -955,11 +980,30 @@ static void doDelayTask(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
cliHandleReq(pMsg, pThrd);
taosMemoryFree(arg);
cliHandleReq(pMsg, pThrd);
}
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STraceId* trace = &pMsg->msg.info.traceId;
STransConnCtx* pCtx = pMsg->ctx;
char buf[256] = {0};
EPSET_DEBUG_STR(&pCtx->epSet, buf);
tGTrace("%s %s, retryCnt:%d, limit:%d", transLabel(pThrd), buf, pCtx->retryCnt + 1, pCtx->retryLimit);
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
}
void cliUpdateRetryLimit(int8_t* val, int8_t exp, int8_t newVal) {
if (*val != exp) {
*val = newVal;
}
}
int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SCliThrd* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
@ -971,68 +1015,45 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
}
STransConnCtx* pCtx = pMsg->ctx;
SEpSet* pEpSet = &pCtx->epSet;
if (pCtx->retryCount == 0) {
if (pCtx->retryCnt == 0) {
pCtx->origEpSet = pCtx->epSet;
}
/*
* no retry
* 1. query conn
* 2. rpc thread already receive quit msg
*/
if (CONN_NO_PERSIST_BY_APP(pConn) && pThrd->quit == false) {
tmsg_t msgType = pCtx->msgType;
if ((pTransInst->retry != NULL && pEpSet->numOfEps > 1 && (pTransInst->retry(pResp->code))) ||
(pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pResp->code == TSDB_CODE_APP_NOT_READY ||
pResp->code == TSDB_CODE_NODE_NOT_DEPLOYED || pResp->code == TSDB_CODE_SYN_NOT_LEADER)) {
int32_t code = pResp->code;
if (CONN_NO_PERSIST_BY_APP(pConn)) {
if (pTransInst->retry != NULL && pTransInst->retry(code)) {
pMsg->sent = 0;
tTrace("try to send req to next node");
pMsg->st = taosGetTimestampUs();
pCtx->retryCnt += 1;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
transUnrefCliHandle(pConn);
pCtx->retryCount += 1;
if (pResp->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
if (pCtx->retryCount < pEpSet->numOfEps * 3) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
transPrintEpSet(pEpSet);
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, pEpSet->numOfEps * 3);
transUnrefCliHandle(pConn);
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3);
if (pCtx->retryCnt < pCtx->retryLimit) {
EPSET_FORWARD_INUSE(&pCtx->epSet);
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
}
} else if (pCtx->retryCount < TRANS_RETRY_COUNT_LIMIT) {
if (pResp->contLen == 0) {
pEpSet->inUse = (++pEpSet->inUse) % pEpSet->numOfEps;
transPrintEpSet(&pCtx->epSet);
tTrace("%s use local epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
} else {
SEpSet epSet = {0};
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &epSet);
pCtx->epSet = epSet;
} else {
addConnToPool(pThrd->pool, pConn);
transPrintEpSet(&pCtx->epSet);
tTrace("%s use remote epset, inUse: %d, retry count:%d, limit: %d", pTransInst->label, pEpSet->inUse,
pCtx->retryCount + 1, TRANS_RETRY_COUNT_LIMIT);
cliUpdateRetryLimit(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT);
if (pCtx->retryCnt < pCtx->retryLimit) {
if (pResp->contLen == 0) {
EPSET_FORWARD_INUSE(&pCtx->epSet);
} else {
tDeserializeSEpSet(pResp->pCont, pResp->contLen, &pCtx->epSet);
}
cliSchedMsgToNextNode(pMsg, pThrd);
return -1;
}
if (pConn->status != ConnInPool) {
addConnToPool(pThrd->pool, pConn);
}
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL);
return -1;
}
}
}
STraceId* trace = &pResp->info.traceId;
if (pCtx->pSem != NULL) {
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
@ -1045,10 +1066,10 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pCtx->pRsp = NULL;
} else {
tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn);
if (pResp->code != 0 || pCtx->retryCount == 0 || transEpSetIsEqual(&pCtx->epSet, &pCtx->origEpSet)) {
if (!cliIsEpsetUpdated(code, pCtx)) {
pTransInst->cfp(pTransInst->parent, pResp, NULL);
} else {
pTransInst->cfp(pTransInst->parent, pResp, pEpSet);
pTransInst->cfp(pTransInst->parent, pResp, &pCtx->epSet);
}
}
return 0;