diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 12aba130d5..04c12abcf9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -48,6 +48,11 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { return (*msgFp)(pWrapper->pMgmt, pMsg); } +static bool dmFailFastFp(tmsg_t msgType) { + // add more msg type later + return msgType == TDMT_SYNC_HEARTBEAT; +} + static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; @@ -260,6 +265,10 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + rpcInit.failFastInterval = 1000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3e9b27dc67..c74d9839c8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -886,7 +886,8 @@ void cliConnCb(uv_connect_t* req, int status) { tError("%s conn %p failed to connect to %s:%d, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->ip, pConn->port, uv_strerror(status)); SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); - if (REQUEST_NO_RESP(&pMsg->msg)) { + STrans* pTransInst = pThrd->pTransInst; + if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { char* ip = pConn->ip; uint32_t port = pConn->port; char key[TSDB_FQDN_LEN + 64] = {0}; @@ -895,7 +896,7 @@ void cliConnCb(uv_connect_t* req, int status) { SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { - if (cTimestamp - item->timestamp < ((STrans*)pThrd->pTransInst)->failFastInterval) { + if (cTimestamp - item->timestamp < pTransInst->failFastInterval) { item->count++; } else { item->count = 1; @@ -1057,7 +1058,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { return; } - if (REQUEST_NO_RESP(&pMsg->msg)) { + if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); char key[TSDB_FQDN_LEN + 64] = {0}; @@ -1376,6 +1377,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); taosMemoryFree(pThrd); } diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 68dfba14e9..cd4324a592 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -572,7 +572,7 @@ int32_t taosClockGetTime(int clock_id, struct timespec *pTS) { offsetInitFinished = true; } else { while (!offsetInitFinished) - ; // Ensure initialization is completed. + ; // Ensure initialization is completed. } GetSystemTimeAsFileTime(&f); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index fc9d90c985..be1db74f1a 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -496,7 +496,7 @@ void taosPrintLongString(const char *flags, ELogLevel level, int32_t dflag, cons if (!osLogSpaceAvailable()) return; if (!(dflag & DEBUG_FILE) && !(dflag & DEBUG_SCREEN)) return; - char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); + char *buffer = taosMemoryMalloc(LOG_MAX_LINE_DUMP_BUFFER_SIZE); int32_t len = taosBuildLogHead(buffer, flags); va_list argpointer;