From e62f3fa86a76a025ea5d8c53d8f42ad26ee8e86f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 18 Oct 2023 16:03:55 +0800 Subject: [PATCH 01/10] opt status send --- include/common/tmsgcb.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 3 ++ source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 5 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 53 +++++++++++++++++++ 5 files changed, 61 insertions(+), 3 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 311bffb7da..7ab69759e3 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -52,6 +52,7 @@ typedef struct { void* data; void* mgmt; void* clientRpc; + void* statusClientRpc; void* serverRpc; PutToQueueFp putToQueueFp; GetQueueSizeFp qsizeFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index c7af552da4..3453731c03 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); + rpcSendRecv(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[256]; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 3cf7a360f9..f11378bb71 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -48,6 +48,7 @@ typedef struct { typedef struct { void *serverRpc; void *clientRpc; + void *statusClientRpc; SDnodeHandle msgHandles[TDMT_MAX]; } SDnodeTrans; @@ -115,7 +116,9 @@ int32_t dmRunDnode(SDnode *pDnode); int32_t dmInitServer(SDnode *pDnode); void dmCleanupServer(SDnode *pDnode); int32_t dmInitClient(SDnode *pDnode); +int32_t dmInitStatusClient(SDnode *pDnode); void dmCleanupClient(SDnode *pDnode); +void dmCleanupStatusClient(SDnode *pDnode); SMsgCb dmGetMsgcb(SDnode *pDnode); int32_t dmInitMsgHandle(SDnode *pDnode); int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 15697dc448..5164d60ba6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -20,8 +20,8 @@ #include "qworker.h" #include "tstream.h" #ifdef TD_TSZ -#include "tglobal.h" #include "tcompression.h" +#include "tglobal.h" #endif int32_t dmInitDnode(SDnode *pDnode) { @@ -66,7 +66,7 @@ int32_t dmInitDnode(SDnode *pDnode) { goto _OVER; } - if(dmInitModule(pDnode) != 0) { + if (dmInitModule(pDnode) != 0) { goto _OVER; } @@ -91,6 +91,7 @@ void dmCleanupDnode(SDnode *pDnode) { if (pDnode == NULL) return; dmCleanupClient(pDnode); + dmCleanupStatusClient(pDnode); dmCleanupServer(pDnode); dmClearVars(pDnode); rpcCleanup(); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dc48ff71f8..b273706feb 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -358,6 +358,50 @@ int32_t dmInitClient(SDnode *pDnode) { dDebug("dnode rpc client is initialized"); return 0; } +int32_t dmInitStatusClient(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + + SRpcInit rpcInit = {0}; + rpcInit.label = "DND-STATUS-C"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; + rpcInit.sessions = 1024; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = TSDB_DEFAULT_USER; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.parent = pDnode; + rpcInit.rfp = rpcRfp; + rpcInit.compressSize = tsCompressMsgSize; + + rpcInit.retryMinInterval = tsRedirectPeriod; + rpcInit.retryStepFactor = tsRedirectFactor; + rpcInit.retryMaxInterval = tsRedirectMaxPeriod; + rpcInit.retryMaxTimeout = tsMaxRetryWaitTime; + + rpcInit.failFastInterval = 5000; // interval threshold(ms) + rpcInit.failFastThreshold = 3; // failed threshold + rpcInit.ffp = dmFailFastFp; + + int32_t connLimitNum = 100; + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + + rpcInit.connLimitNum = connLimitNum; + rpcInit.connLimitLock = 1; + rpcInit.supportBatch = 1; + rpcInit.batchSize = 8 * 1024; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); + + pTrans->statusClientRpc = rpcOpen(&rpcInit); + if (pTrans->statusClientRpc == NULL) { + dError("failed to init dnode rpc status client"); + return -1; + } + + dDebug("dnode rpc status client is initialized"); + return 0; +} void dmCleanupClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -367,6 +411,14 @@ void dmCleanupClient(SDnode *pDnode) { dDebug("dnode rpc client is closed"); } } +void dmCleanupStatusClient(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + if (pTrans->statusClientRpc) { + rpcClose(pTrans->statusClientRpc); + pTrans->statusClientRpc = NULL; + dDebug("dnode rpc status client is closed"); + } +} int32_t dmInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -405,6 +457,7 @@ void dmCleanupServer(SDnode *pDnode) { SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb msgCb = { .clientRpc = pDnode->trans.clientRpc, + .statusClientRpc = pDnode->trans.statusClientRpc, .serverRpc = pDnode->trans.serverRpc, .sendReqFp = dmSendReq, .sendRspFp = dmSendRsp, From ef27c877565a8c3baec6d4aff72f77f4520be5d6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 18 Oct 2023 16:37:34 +0800 Subject: [PATCH 02/10] opt status send --- include/util/tdef.h | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/libs/transport/src/trans.c | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index 287617970c..7f8fe22340 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -249,7 +249,7 @@ typedef enum ELogicConditionType { #define TSDB_PASSWORD_LEN 32 #define TSDB_USET_PASSWORD_LEN 129 #define TSDB_VERSION_LEN 32 -#define TSDB_LABEL_LEN 8 +#define TSDB_LABEL_LEN 12 #define TSDB_JOB_STATUS_LEN 32 #define TSDB_CLUSTER_ID_LEN 40 diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b273706feb..dbd68a2fbe 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -362,7 +362,7 @@ int32_t dmInitStatusClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; SRpcInit rpcInit = {0}; - rpcInit.label = "DND-STATUS-C"; + rpcInit.label = "DND-STATUS"; rpcInit.numOfThreads = 1; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index e9aa62eded..7b1ae087f2 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -41,7 +41,8 @@ void* rpcOpen(const SRpcInit* pInit) { return NULL; } if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, sizeof(pRpc->label)); + int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label); + tstrncpy(pRpc->label, pInit->label, len); } pRpc->compressSize = pInit->compressSize; From 1910540a611c92b1dbba152954f2876af498147e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 10:15:04 +0800 Subject: [PATCH 03/10] add rpc sync read timeout --- source/libs/transport/inc/transComm.h | 23 ++++-- source/libs/transport/src/trans.c | 3 + source/libs/transport/src/transCli.c | 100 +++++++++++++++++++++++--- source/libs/transport/src/transComm.c | 21 +++++- 4 files changed, 131 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c82b7c0532..73427446e6 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -119,6 +119,13 @@ typedef struct SExHandle { void* pThrd; } SExHandle; +typedef struct { + STransMsg* pRsp; + tsem_t* pSem; + int8_t inited; + SRWLatch latch; +} STransSyncMsg; + /*convet from fqdn to ip */ typedef struct SCvtAddr { char ip[TSDB_FQDN_LEN]; @@ -133,11 +140,13 @@ typedef struct { tmsg_t msgType; // message type int8_t connType; // connection type cli/srv - STransCtx appCtx; // - STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API - SCvtAddr cvtAddr; - bool setMaxRetry; + STransCtx appCtx; // + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + STransSyncMsg* pSyncMsg; // for syncchronous with timeout API + int64_t syncMsgRef; + SCvtAddr cvtAddr; + bool setMaxRetry; int32_t retryMinInterval; int32_t retryMaxInterval; @@ -307,6 +316,7 @@ int transReleaseSrvHandle(void* handle); int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int32_t timeoutMs); int transSendResponse(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); @@ -432,10 +442,11 @@ int64_t transAddExHandle(int32_t refMgt, void* p); int32_t transRemoveExHandle(int32_t refMgt, int64_t refId); void* transAcquireExHandle(int32_t refMgt, int64_t refId); int32_t transReleaseExHandle(int32_t refMgt, int64_t refId); -void transDestoryExHandle(void* handle); +void transDestroyExHandle(void* handle); int32_t transGetRefMgt(); int32_t transGetInstMgt(); +int32_t transGetSyncMsgMgt(); void transHttpEnvDestroy(); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 7b1ae087f2..6842d9ee82 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -169,6 +169,9 @@ int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, in int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { return transSendRecv(shandle, pEpSet, pMsg, pRsp); } +int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int32_t timeoutMs) { + return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, timeoutMs); +} int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 677e08ec56..3156a0733c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2411,15 +2411,24 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } } } - if (pCtx->pSem != NULL) { + if (pCtx->pSem || pCtx->syncMsgRef != 0) { tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn); - if (pCtx->pRsp == NULL) { - tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); + if (pCtx->pSem) { + if (pCtx->pRsp == NULL) { + tGTrace("%s conn %p(sync) failed to resp, ignore", CONN_GET_INST_LABEL(pConn), pConn); + } else { + memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); + } + tsem_post(pCtx->pSem); + pCtx->pRsp = NULL; } else { - memcpy((char*)pCtx->pRsp, (char*)pResp, sizeof(*pResp)); + STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); + if (pSyncMsg != NULL) { + memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); + tsem_post(pSyncMsg->pSem); + taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); + } } - tsem_post(pCtx->pSem); - pCtx->pRsp = NULL; } else { tGTrace("%s conn %p handle resp", CONN_GET_INST_LABEL(pConn), pConn); if (retry == false && hasEpSet == true) { @@ -2563,7 +2572,8 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran } int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); return -1; @@ -2587,7 +2597,7 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; pCtx->pSem = sem; - pCtx->pRsp = pRsp; + pCtx->pRsp = pTransRsp; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; @@ -2607,10 +2617,84 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs } tsem_wait(sem); + memcpy(pRsp, pTransRsp, sizeof(STransMsg)); + _RETURN: tsem_destroy(sem); taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosMemoryFree(pTransRsp); + return ret; +} +int64_t transCreateSyncMsg(STransMsg* pTransMsg) { + tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); + tsem_init(sem, 0, 0); + + STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); + + taosInitRWLatch(&pSyncMsg->latch); + pSyncMsg->inited = 0; + pSyncMsg->pRsp = pTransMsg; + pSyncMsg->pSem = sem; + + return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); +} +int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int32_t timeoutMs) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return -1; + } + + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + + TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); + + STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + epsetAssign(&pCtx->epSet, pEpSet); + epsetAssign(&pCtx->origEpSet, pEpSet); + pCtx->ahandle = pReq->info.ahandle; + pCtx->msgType = pReq->msgType; + pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg); + + int64_t ref = pCtx->syncMsgRef; + STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref); + + SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + cliMsg->ctx = pCtx; + cliMsg->msg = *pReq; + cliMsg->st = taosGetTimestampUs(); + cliMsg->type = Normal; + cliMsg->refId = (int64_t)shandle; + + STraceId* trace = &pReq->info.traceId; + tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, + EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); + + int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (ret != 0) { + destroyCmsg(cliMsg); + goto _RETURN; + } + + ret = tsem_timewait(pSyncMsg->pSem, timeoutMs); + if (ret < 0) { + pRsp->code = TSDB_CODE_TIMEOUT_ERROR; + ret = TSDB_CODE_TIMEOUT_ERROR; + } else { + memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); + ret = 0; + } +_RETURN: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosReleaseRef(transGetSyncMsgMgt(), ref); + taosRemoveRef(transGetSyncMsgMgt(), ref); return ret; } /* diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 3dc59a93ee..759a4d79db 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -21,6 +21,9 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; static int32_t instMgt; +static int32_t transSyncMsgMgt; + +void transDestroySyncMsg(void* msg); int32_t transCompressMsg(char* msg, int32_t len) { int32_t ret = 0; @@ -601,13 +604,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } static void transInitEnv() { - refMgt = transOpenRefMgt(50000, transDestoryExHandle); + refMgt = transOpenRefMgt(50000, transDestroyExHandle); instMgt = taosOpenRef(50, rpcCloseImpl); + transSyncMsgMgt = taosOpenRef(50, transDestroySyncMsg); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } static void transDestroyEnv() { transCloseRefMgt(refMgt); transCloseRefMgt(instMgt); + transCloseRefMgt(transSyncMsgMgt); } void transInit() { @@ -617,6 +622,7 @@ void transInit() { int32_t transGetRefMgt() { return refMgt; } int32_t transGetInstMgt() { return instMgt; } +int32_t transGetSyncMsgMgt() { return transSyncMsgMgt; } void transCleanup() { // clean env @@ -648,13 +654,24 @@ int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) { // release extern handle return taosReleaseRef(refMgt, refId); } -void transDestoryExHandle(void* handle) { +void transDestroyExHandle(void* handle) { if (handle == NULL) { return; } taosMemoryFree(handle); } +void transDestroySyncMsg(void* msg) { + if (msg == NULL) return; + + STransSyncMsg* pSyncMsg = msg; + tsem_destroy(pSyncMsg->pSem); + taosMemoryFree(pSyncMsg->pSem); + + taosMemoryFree(pSyncMsg->pRsp); + taosMemoryFree(pSyncMsg); +} + // void subnetIp2int(const char* const ip_addr, uint8_t* dst) { // char ip_addr_cpy[20]; // char ip[5]; From 365d535e6d2a1df5f9a9a7e4b33b2f3f92132307 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 10:32:51 +0800 Subject: [PATCH 04/10] add rpc sync read timeout --- source/libs/transport/src/transCli.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3156a0733c..842f961141 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2576,12 +2576,14 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); + taosMemoryFree(pTransRsp); return -1; } SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { transFreeMsg(pReq->pCont); + taosMemoryFree(pTransRsp); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } @@ -2644,12 +2646,14 @@ int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pRe STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); + taosMemoryFree(pTransMsg); return -1; } SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { transFreeMsg(pReq->pCont); + taosMemoryFree(pTransMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } From b9b62a9a005982aa908fe7636cd8bb0fcc4ef183 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 10:38:45 +0800 Subject: [PATCH 05/10] add rpc sync read timeout --- include/libs/transport/trpc.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 83dbf2c4af..accb7e6f24 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -163,6 +163,7 @@ int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc in // These functions will not be called in the child process int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int32_t timeoutMs); int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); void *rpcAllocHandle(); void rpcSetIpWhite(void *thandl, void *arg); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 3453731c03..7edd6d7d63 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecv(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp); + rpcSendRecvWithTimeout(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp, 1000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[256]; From 27b2d37bde83733575b844ce77e24a2eb159cbe2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 11:27:31 +0800 Subject: [PATCH 06/10] add rpc sync read timeout --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 7edd6d7d63..ffa8e6d7da 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp, 1000); + rpcSendRecvWithTimeout(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp, 2000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[256]; From b4b742b3fb611718cb83288d8248f6e7284fe20a Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 11:35:29 +0800 Subject: [PATCH 07/10] add rpc sync read timeout --- source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ffa8e6d7da..3ca782d670 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp, 2000); + rpcSendRecvWithTimeout(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp, 5000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[256]; From afac84465678104668812a72fa8efbca59f10e0e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 11:45:59 +0800 Subject: [PATCH 08/10] add rpc sync read time --- source/libs/transport/src/transCli.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 842f961141..ef60c8a94e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2427,6 +2427,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); tsem_post(pSyncMsg->pSem); taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); + } else { + rpcFreeCont(pResp->pCont); } } } else { From 0188289308ed4c77e3242f423aea8ea96da0fb45 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 14:07:54 +0800 Subject: [PATCH 09/10] statusClientRpc --- include/common/tmsgcb.h | 1 - source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 2 +- source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 1 - source/dnode/mgmt/node_mgmt/src/dmTransport.c | 21 +++++++++---------- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 7ab69759e3..311bffb7da 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -52,7 +52,6 @@ typedef struct { void* data; void* mgmt; void* clientRpc; - void* statusClientRpc; void* serverRpc; PutToQueueFp putToQueueFp; GetQueueSizeFp qsizeFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 3ca782d670..991f17f326 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -160,7 +160,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.statusClientRpc, &epSet, &rpcMsg, &rpcRsp, 5000); + rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, 5000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); char tbuf[256]; diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index f11378bb71..20789772e5 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -48,7 +48,6 @@ typedef struct { typedef struct { void *serverRpc; void *clientRpc; - void *statusClientRpc; SDnodeHandle msgHandles[TDMT_MAX]; } SDnodeTrans; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dbd68a2fbe..ce6b21dd56 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -393,11 +393,11 @@ int32_t dmInitStatusClient(SDnode *pDnode) { rpcInit.timeToGetConn = tsTimeToGetAvailableConn; taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); - pTrans->statusClientRpc = rpcOpen(&rpcInit); - if (pTrans->statusClientRpc == NULL) { - dError("failed to init dnode rpc status client"); - return -1; - } + // pTrans->statusClientRpc = rpcOpen(&rpcInit); + // if (pTrans->statusClientRpc == NULL) { + // dError("failed to init dnode rpc status client"); + // return -1; + // } dDebug("dnode rpc status client is initialized"); return 0; @@ -413,11 +413,11 @@ void dmCleanupClient(SDnode *pDnode) { } void dmCleanupStatusClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; - if (pTrans->statusClientRpc) { - rpcClose(pTrans->statusClientRpc); - pTrans->statusClientRpc = NULL; - dDebug("dnode rpc status client is closed"); - } + // if (pTrans->statusClientRpc) { + // rpcClose(pTrans->statusClientRpc); + // pTrans->statusClientRpc = NULL; + // dDebug("dnode rpc status client is closed"); + // } } int32_t dmInitServer(SDnode *pDnode) { @@ -457,7 +457,6 @@ void dmCleanupServer(SDnode *pDnode) { SMsgCb dmGetMsgcb(SDnode *pDnode) { SMsgCb msgCb = { .clientRpc = pDnode->trans.clientRpc, - .statusClientRpc = pDnode->trans.statusClientRpc, .serverRpc = pDnode->trans.serverRpc, .sendReqFp = dmSendReq, .sendRspFp = dmSendRsp, From 93cac3532e0521dc5125589139551d3b6c143046 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 19 Oct 2023 15:30:04 +0800 Subject: [PATCH 10/10] fix invalid debug info --- source/libs/transport/src/trans.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 6842d9ee82..b23d229931 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -42,7 +42,7 @@ void* rpcOpen(const SRpcInit* pInit) { } if (pInit->label) { int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label); - tstrncpy(pRpc->label, pInit->label, len); + memcpy(pRpc->label, pInit->label, len); } pRpc->compressSize = pInit->compressSize;