diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index de3c2a9f52..ff68b72fc2 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -112,6 +112,8 @@ typedef struct SRpcInit { // fail fast fp RpcFFfp ffp; + int32_t connLimit; + void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dcb63f6524..dc539ac15e 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -284,6 +284,8 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.failFastThreshold = 3; // failed threshold rpcInit.ffp = dmFailFastFp; + rpcInit.connLimit = 7500; + pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { dError("failed to init dnode rpc client"); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 2db4a72795..92477bb514 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,6 +64,7 @@ typedef struct { void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); + int32_t connLimit; int index; void* parent; void* tcphandle; // returned handle from TCP initialization diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 47b1ac5ca7..61ca9743b3 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -67,6 +67,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->startTimer = pInit->tfp; pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; + pRpc->connLimit = pInit->connLimit; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d8ea21c335..dfbc8a5af2 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -80,6 +80,7 @@ typedef struct SCliThrd { uint64_t nextTimeout; // next timeout void* pTransInst; // + int connCount; void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; @@ -671,7 +672,6 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - // transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { @@ -694,6 +694,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->broken = 0; transRefCliHandle(conn); + atomic_add_fetch_32(&pThrd->connCount, 1); allocConnRef(conn, false); return conn; @@ -738,6 +739,8 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer = NULL; } + atomic_sub_fetch_32(&pThrd->connCount, 1); + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); taosMemoryFree(conn->ip); @@ -1861,6 +1864,13 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran return TSDB_CODE_RPC_BROKEN_LINK; } + // read only + if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_MAX_SESSIONS; + } + TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); @@ -1902,6 +1912,13 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } + // not limit sync req + // read only + // if (pTransInst->connLimit != 0 && atomic_load_32(&pThrd->connCount) >= pTransInst->connLimit) { + // transFreeMsg(pReq->pCont); + // transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + // return TSDB_CODE_RPC_MAX_SESSIONS; + //} tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_init(sem, 0, 0); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 57b1998155..c07fa88af5 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -52,6 +52,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_PORT_EADDRINUSE, "Port already in use") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")