diff --git a/include/util/tdef.h b/include/util/tdef.h index a750074953..46a0d01457 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -499,7 +499,7 @@ typedef enum ELogicConditionType { #ifdef WINDOWS #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #else -#define TSDB_MAX_RPC_THREADS 10 +#define TSDB_MAX_RPC_THREADS 50 #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 84f0ffc8cb..95ea6944e3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -272,19 +272,19 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); } \ } while (0) -#define ASYNC_CHECK_HANDLE(exh1, id) \ - do { \ - if (id > 0) { \ - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ - if (exh2 == NULL || id != exh2->refId) { \ - tDebug("ref:%" PRId64 " already released" PRIu64, id); \ - code = terrno; \ - goto _return1; \ - } \ - } else { \ - tWarn("invalid handle to release"); \ - goto _return2; \ - } \ +#define ASYNC_CHECK_HANDLE(exh1, id) \ + do { \ + if (id > 0) { \ + SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), id); \ + if (exh2 == NULL || id != exh2->refId) { \ + tDebug("ref:%" PRId64 " already released", id); \ + code = terrno; \ + goto _return1; \ + } \ + } else { \ + tDebug("invalid handle to release"); \ + goto _return2; \ + } \ } while (0) int32_t transInitBuffer(SConnBuffer* buf); @@ -443,6 +443,7 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId); void transDestroyExHandle(void* handle); int32_t transGetRefMgt(); +int32_t transGetSvrRefMgt(); int32_t transGetInstMgt(); int32_t transGetSyncMsgMgt(); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1758ca65cc..1329489be6 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -20,6 +20,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; +static int32_t svrRefMgt; static int32_t instMgt; static int32_t transSyncMsgMgt; @@ -704,12 +705,14 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) { static void transInitEnv() { refMgt = transOpenRefMgt(50000, transDestroyExHandle); + svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle); instMgt = taosOpenRef(50, rpcCloseImpl); transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg); (void)uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } static void transDestroyEnv() { transCloseRefMgt(refMgt); + transCloseRefMgt(svrRefMgt); transCloseRefMgt(instMgt); transCloseRefMgt(transSyncMsgMgt); } @@ -724,6 +727,7 @@ int32_t transInit() { } int32_t transGetRefMgt() { return refMgt; } +int32_t transGetSvrRefMgt() { return svrRefMgt; } int32_t transGetInstMgt() { return instMgt; } int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 53a7dee7be..11aa468b19 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -373,6 +373,7 @@ static bool uvHandleReq(SSvrConn* pConn) { STrans* pTransInst = pConn->pTransInst; SWorkThrd* pThrd = pConn->hostThrd; + int8_t acquire = 0; STransMsgHead* pHead = NULL; int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; @@ -459,7 +460,13 @@ static bool uvHandleReq(SSvrConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId); + + if (pHead->noResp == 1) { + transMsg.info.handle = NULL; + } else { + transMsg.info.handle = (void*)transAcquireExHandle(transGetSvrRefMgt(), pConn->refId); + acquire = 1; + } transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; transMsg.info.cliVer = htonl(pHead->compatibilityVer); @@ -468,10 +475,10 @@ static bool uvHandleReq(SSvrConn* pConn) { tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, pConn->refId); - if (transMsg.info.handle == NULL) { - tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn); - return false; - } + // if (transMsg.info.handle == NULL) { + // tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn); + // return false; + // } if (pHead->noResp == 1) { transMsg.info.refId = -1; @@ -483,7 +490,7 @@ static bool uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = pConn->port; tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - (void)transReleaseExHandle(transGetRefMgt(), pConn->refId); + if (acquire) transReleaseExHandle(transGetSvrRefMgt(), pConn->refId); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); return true; @@ -770,15 +777,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -874,15 +881,15 @@ static void uvPrepareCb(uv_prepare_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -1215,14 +1222,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = transAddExHandle(transGetRefMgt(), exh); + exh->refId = transAddExHandle(transGetSvrRefMgt(), exh); if (exh->refId < 0) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); } QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId); if (pSelf != exh) { TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); } @@ -1284,8 +1291,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { } static int32_t reallocConnRef(SSvrConn* conn) { if (conn->refId > 0) { - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId); } // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); @@ -1295,14 +1302,14 @@ static int32_t reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(transGetRefMgt(), exh); + exh->refId = transAddExHandle(transGetSvrRefMgt(), exh); if (exh->refId < 0) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; } QUEUE_INIT(&exh->q); - SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId); if (pSelf != exh) { tError("conn %p failed to acquire handle", conn); taosMemoryFree(exh); @@ -1321,8 +1328,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId); + (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId); STrans* pTransInst = thrd->pTransInst; tDebug("%s conn %p destroy", transLabel(pTransInst), conn); @@ -1752,15 +1759,15 @@ int32_t transReleaseSrvHandle(void* handle) { tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return 0; _return1: tDebug("handle %p failed to send to release handle", exh); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; _return2: tDebug("handle %p failed to send to release handle", exh); @@ -1803,17 +1810,17 @@ int32_t transSendResponse(const STransMsg* msg) { tGDebug("conn %p start to send resp (1/2)", exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return 0; _return1: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; _return2: tDebug("handle %p failed to send resp", exh); @@ -1848,17 +1855,17 @@ int32_t transRegisterMsg(const STransMsg* msg) { tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; } - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return 0; _return1: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(transGetRefMgt(), refId); + (void)transReleaseExHandle(transGetSvrRefMgt(), refId); return code; _return2: tDebug("handle %p failed to register brokenlink", exh);