support fail fast

This commit is contained in:
yihaoDeng 2022-12-06 10:30:14 +08:00
parent 4be196491e
commit 9ce9d21507
4 changed files with 16 additions and 5 deletions

View File

@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
return (*msgFp)(pWrapper->pMgmt, pMsg);
}
static bool dmFailFastFp(tmsg_t msgType) {
// add more msg type later
return msgType == TDMT_SYNC_HEARTBEAT;
}
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SDnodeTrans *pTrans = &pDnode->trans;
int32_t code = -1;
@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
rpcInit.failFastInterval = 1000; // interval threshold(ms)
rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
dError("failed to init dnode rpc client");

View File

@ -886,7 +886,8 @@ void cliConnCb(uv_connect_t* req, int status) {
tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip,
pConn->port, uv_strerror(status));
SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0);
if (REQUEST_NO_RESP(&pMsg->msg)) {
STrans* pTransInst = pThrd->pTransInst;
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = pConn->ip;
uint32_t port = pConn->port;
char key[TSDB_FQDN_LEN + 64] = {0};
@ -895,7 +896,7 @@ void cliConnCb(uv_connect_t* req, int status) {
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
if (cTimestamp - item->timestamp < ((STrans*)pThrd->pTransInst)->failFastInterval) {
if (cTimestamp - item->timestamp < pTransInst->failFastInterval) {
item->count++;
} else {
item->count = 1;
@ -1057,7 +1058,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
return;
}
if (REQUEST_NO_RESP(&pMsg->msg)) {
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
@ -1376,6 +1377,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosMemoryFree(pThrd);
}