diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 460b8962ea..95f70c8ff3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -78,6 +78,7 @@ typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *epset); typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcTfp)(int32_t code, tmsg_t msgType); typedef bool (*RpcFFfp)(tmsg_t msgType); +typedef bool (*RpcNoDelayfp)(tmsg_t msgType); typedef void (*RpcDfp)(void *ahandle); typedef struct SRpcInit { @@ -118,6 +119,8 @@ typedef struct SRpcInit { // fail fast fp RpcFFfp ffp; + RpcNoDelayfp noDelayFp; + int32_t connLimitNum; int32_t connLimitLock; int32_t timeToGetConn; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 916de6e715..03a024bb8c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -77,6 +77,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) +#define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 754c42b82e..a2355ddd22 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -330,7 +330,13 @@ static bool rpcRfp(int32_t code, tmsg_t msgType) { return false; } } - +static bool rpcNoDelayMsg(tmsg_t msgType) { + if (msgType == TDMT_VND_FETCH_TTL_EXPIRED_TBS || msgType == TDMT_VND_S3MIGRATE || msgType == TDMT_VND_S3MIGRATE || + msgType == TDMT_VND_QUERY_COMPACT_PROGRESS || msgType == TDMT_VND_DROP_TTL_TABLE) { + return true; + } + return false; +} int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -356,6 +362,8 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; + rpcInit.noDelayFp = rpcNoDelayMsg; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3) / 2; connLimitNum = TMAX(connLimitNum, 10); connLimitNum = TMIN(connLimitNum, 500); @@ -365,6 +373,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); pTrans->clientRpc = rpcOpen(&rpcInit); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index cc2c0d4e84..7853e25cff 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -63,6 +63,7 @@ typedef struct { bool (*startTimer)(int32_t code, tmsg_t msgType); void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); + bool (*noDelayFp)(tmsg_t msgType); int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f658947144..5ed2e00acd 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; + pRpc->noDelayFp = pInit->noDelayFp; pRpc->connLimitNum = pInit->connLimitNum; if (pRpc->connLimitNum == 0) { pRpc->connLimitNum = 20; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4da1f04cd9..dfd7630f35 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -204,7 +204,7 @@ static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -617,7 +617,7 @@ void* destroyConnPool(SCliThrd* pThrd) { transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); pMsg->ctx->task = NULL; - doNotifyApp(pMsg, pThrd); + doNotifyApp(pMsg, pThrd, TSDB_CODE_RPC_MAX_SESSIONS); } taosMemoryFree(msglist); @@ -692,13 +692,20 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; + if (pTransInst->noDelayFp != NULL && pTransInst->noDelayFp((*pMsg)->msg.msgType)) { + tDebug("%s msg %s not to send, reason: %s", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType), + tstrerror(TSDB_CODE_RPC_NETWORK_BUSY)); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_RPC_NETWORK_BUSY); + *pMsg = NULL; + return NULL; + } + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = *pMsg; arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); - tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); - QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; } else { @@ -1394,14 +1401,14 @@ void cliConnCb(uv_connect_t* req, int status) { } } -static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd, int32_t code) { STransConnCtx* pCtx = pMsg->ctx; STrans* pTransInst = pThrd->pTransInst; STransMsg transMsg = {0}; transMsg.contLen = 0; transMsg.pCont = NULL; - transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; + transMsg.code = code; transMsg.msgType = pMsg->msg.msgType + 1; transMsg.info.ahandle = pMsg->ctx->ahandle; transMsg.info.traceId = pMsg->msg.info.traceId; @@ -1578,11 +1585,11 @@ static void doFreeTimeoutMsg(void* param) { SCliMsg* pMsg = arg->param1; SCliThrd* pThrd = arg->param2; STrans* pTransInst = pThrd->pTransInst; - + int32_t code = TSDB_CODE_RPC_MAX_SESSIONS; QUEUE_REMOVE(&pMsg->q); STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); - doNotifyApp(pMsg, pThrd); + doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ab5d3da781..3ef656b2b4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -58,6 +58,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")