diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 54d468a370..5b860cc23a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -125,6 +125,7 @@ typedef struct SRpcInit { int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; + int8_t notWaitAvaliableConn; // 1: wait to get, 0: no wait void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 3d758e1fd3..9bbfdd18b0 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -387,6 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + rpcInit.notWaitAvaliableConn = 1; (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 7853e25cff..703a4dde3e 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -58,6 +58,8 @@ typedef struct { int32_t failFastThreshold; int32_t failFastInterval; + int8_t notWaitAvaliableConn; // 1: no delay, 0: delay + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); bool (*retry)(int32_t code, tmsg_t msgType); bool (*startTimer)(int32_t code, tmsg_t msgType); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index b46a623493..8b99443a84 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -102,6 +102,8 @@ void* rpcOpen(const SRpcInit* pInit) { if (pRpc->timeToGetConn == 0) { pRpc->timeToGetConn = 10 * 1000; } + pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn; + pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 6f1ce6ad0b..4b324e52c6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -708,7 +708,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; - if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) { + if (pTransInst->notWaitAvaliableConn || (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType))) { tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY);