Merge pull request #27678 from taosdata/fix/optTransportPara

Fix/optTransportPara
This commit is contained in:
Hongze Cheng 2024-09-05 16:20:31 +08:00 committed by GitHub
commit 6b455cfb55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 55 additions and 43 deletions

View File

@ -499,7 +499,7 @@ typedef enum ELogicConditionType {
#ifdef WINDOWS #ifdef WINDOWS
#define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections.
#else #else
#define TSDB_MAX_RPC_THREADS 10 #define TSDB_MAX_RPC_THREADS 50
#endif #endif
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type

View File

@ -272,19 +272,19 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool);
} \ } \
} while (0) } while (0)
#define ASYNC_CHECK_HANDLE(exh1, id) \ #define ASYNC_CHECK_HANDLE(exh1, id) \
do { \ do { \
if (id > 0) { \ if (id > 0) { \
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \ if (exh2 == NULL || id != exh2->refId) { \
tDebug("ref:%" PRId64 " already released" PRIu64, id); \ tDebug("ref:%" PRId64 " already released", id); \
code = terrno; \ code = terrno; \
goto _return1; \ goto _return1; \
} \ } \
} else { \ } else { \
tWarn("invalid handle to release"); \ tDebug("invalid handle to release"); \
goto _return2; \ goto _return2; \
} \ } \
} while (0) } while (0)
int32_t transInitBuffer(SConnBuffer* buf); int32_t transInitBuffer(SConnBuffer* buf);
@ -443,6 +443,7 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestroyExHandle(void* handle); void transDestroyExHandle(void* handle);
int32_t transGetRefMgt(); int32_t transGetRefMgt();
int32_t transGetSvrRefMgt();
int32_t transGetInstMgt(); int32_t transGetInstMgt();
int32_t transGetSyncMsgMgt(); int32_t transGetSyncMsgMgt();

View File

@ -20,6 +20,7 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
static int32_t svrRefMgt;
static int32_t instMgt; static int32_t instMgt;
static int32_t transSyncMsgMgt; static int32_t transSyncMsgMgt;
@ -704,12 +705,14 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b) {
static void transInitEnv() { static void transInitEnv() {
refMgt = transOpenRefMgt(50000, transDestroyExHandle); refMgt = transOpenRefMgt(50000, transDestroyExHandle);
svrRefMgt = transOpenRefMgt(50000, transDestroyExHandle);
instMgt = taosOpenRef(50, rpcCloseImpl); instMgt = taosOpenRef(50, rpcCloseImpl);
transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg); transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg);
(void)uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); (void)uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
} }
static void transDestroyEnv() { static void transDestroyEnv() {
transCloseRefMgt(refMgt); transCloseRefMgt(refMgt);
transCloseRefMgt(svrRefMgt);
transCloseRefMgt(instMgt); transCloseRefMgt(instMgt);
transCloseRefMgt(transSyncMsgMgt); transCloseRefMgt(transSyncMsgMgt);
} }
@ -724,6 +727,7 @@ int32_t transInit() {
} }
int32_t transGetRefMgt() { return refMgt; } int32_t transGetRefMgt() { return refMgt; }
int32_t transGetSvrRefMgt() { return svrRefMgt; }
int32_t transGetInstMgt() { return instMgt; } int32_t transGetInstMgt() { return instMgt; }
int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; } int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; }

View File

@ -373,6 +373,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
SWorkThrd* pThrd = pConn->hostThrd; SWorkThrd* pThrd = pConn->hostThrd;
int8_t acquire = 0;
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
@ -459,7 +460,13 @@ static bool uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
if (pHead->noResp == 1) {
transMsg.info.handle = NULL;
} else {
transMsg.info.handle = (void*)transAcquireExHandle(transGetSvrRefMgt(), pConn->refId);
acquire = 1;
}
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
transMsg.info.cliVer = htonl(pHead->compatibilityVer); transMsg.info.cliVer = htonl(pHead->compatibilityVer);
@ -468,10 +475,10 @@ static bool uvHandleReq(SSvrConn* pConn) {
tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn, tGTrace("%s handle %p conn:%p translated to app, refId:%" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn,
pConn->refId); pConn->refId);
if (transMsg.info.handle == NULL) { // if (transMsg.info.handle == NULL) {
tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn); // tError("%s handle %p conn:%p handle failed to init" PRIu64, transLabel(pTransInst), transMsg.info.handle, pConn);
return false; // return false;
} // }
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
transMsg.info.refId = -1; transMsg.info.refId = -1;
@ -483,7 +490,7 @@ static bool uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = pConn->port; pConnInfo->clientPort = pConn->port;
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
(void)transReleaseExHandle(transGetRefMgt(), pConn->refId); if (acquire) transReleaseExHandle(transGetSvrRefMgt(), pConn->refId);
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
return true; return true;
@ -770,15 +777,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
} }
@ -874,15 +881,15 @@ static void uvPrepareCb(uv_prepare_t* handle) {
SExHandle* exh1 = transMsg.info.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
} }
@ -1215,14 +1222,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) {
exh->handle = pConn; exh->handle = pConn;
exh->pThrd = pThrd; exh->pThrd = pThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetSvrRefMgt(), exh);
if (exh->refId < 0) { if (exh->refId < 0) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
} }
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId);
if (pSelf != exh) { if (pSelf != exh) {
TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end);
} }
@ -1284,8 +1291,8 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) {
} }
static int32_t reallocConnRef(SSvrConn* conn) { static int32_t reallocConnRef(SSvrConn* conn) {
if (conn->refId > 0) { if (conn->refId > 0) {
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId);
} }
// avoid app continue to send msg on invalid handle // avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
@ -1295,14 +1302,14 @@ static int32_t reallocConnRef(SSvrConn* conn) {
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(transGetRefMgt(), exh); exh->refId = transAddExHandle(transGetSvrRefMgt(), exh);
if (exh->refId < 0) { if (exh->refId < 0) {
taosMemoryFree(exh); taosMemoryFree(exh);
return TSDB_CODE_REF_INVALID_ID; return TSDB_CODE_REF_INVALID_ID;
} }
QUEUE_INIT(&exh->q); QUEUE_INIT(&exh->q);
SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); SExHandle* pSelf = transAcquireExHandle(transGetSvrRefMgt(), exh->refId);
if (pSelf != exh) { if (pSelf != exh) {
tError("conn %p failed to acquire handle", conn); tError("conn %p failed to acquire handle", conn);
taosMemoryFree(exh); taosMemoryFree(exh);
@ -1321,8 +1328,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
} }
SWorkThrd* thrd = conn->hostThrd; SWorkThrd* thrd = conn->hostThrd;
(void)transReleaseExHandle(transGetRefMgt(), conn->refId); (void)transReleaseExHandle(transGetSvrRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId); (void)transRemoveExHandle(transGetSvrRefMgt(), conn->refId);
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
tDebug("%s conn %p destroy", transLabel(pTransInst), conn); tDebug("%s conn %p destroy", transLabel(pTransInst), conn);
@ -1752,15 +1759,15 @@ int32_t transReleaseSrvHandle(void* handle) {
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
} }
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tDebug("handle %p failed to send to release handle", exh); tDebug("handle %p failed to send to release handle", exh);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
_return2: _return2:
tDebug("handle %p failed to send to release handle", exh); tDebug("handle %p failed to send to release handle", exh);
@ -1803,17 +1810,17 @@ int32_t transSendResponse(const STransMsg* msg) {
tGDebug("conn %p start to send resp (1/2)", exh->handle); tGDebug("conn %p start to send resp (1/2)", exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
} }
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tDebug("handle %p failed to send resp", exh); tDebug("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
_return2: _return2:
tDebug("handle %p failed to send resp", exh); tDebug("handle %p failed to send resp", exh);
@ -1848,17 +1855,17 @@ int32_t transRegisterMsg(const STransMsg* msg) {
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) {
destroySmsg(m); destroySmsg(m);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
} }
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return 0; return 0;
_return1: _return1:
tDebug("handle %p failed to register brokenlink", exh); tDebug("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
(void)transReleaseExHandle(transGetRefMgt(), refId); (void)transReleaseExHandle(transGetSvrRefMgt(), refId);
return code; return code;
_return2: _return2:
tDebug("handle %p failed to register brokenlink", exh); tDebug("handle %p failed to register brokenlink", exh);