From 79f1e90743e883b8c7b7c034ba139e895fa2e88b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 17:11:53 +0800 Subject: [PATCH 1/4] fix: oom in rpc queue --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 3 ++- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 4 +++- source/util/src/tqueue.c | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 45d1486912..8c1b33cb14 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -287,7 +287,8 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { return -1; } - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); + EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM; + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 74bf1f964c..bc269a6410 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -208,7 +208,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } pRpc->info.wrapper = pWrapper; - pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); + + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // resp msg is not limited by tsRpcQueueMemoryUsed + pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 7a4eb09b99..aa8834c89f 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -494,6 +494,8 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo * qall->start = queue->head; qall->numOfItems = queue->numOfItems; qall->memOfItems = queue->memOfItems; + qall->unAccessedNumOfItems = queue->numOfItems; + qall->unAccessMemOfItems = queue->memOfItems; code = qall->numOfItems; qinfo->ahandle = queue->ahandle; From a9a6747ac09cead5d0104def2a7b963ee1556d95 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 18:12:55 +0800 Subject: [PATCH 2/4] fix: oom in rpc queue --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index bc269a6410..c3dfc1a64c 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -209,7 +209,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; - EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // resp msg is not limited by tsRpcQueueMemoryUsed + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg should not be restricted by tsRpcQueueMemoryUsed pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) goto _OVER; From 81577b82222013e84a485004f5b8a4846d8f9002 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 18:19:08 +0800 Subject: [PATCH 3/4] fix: oom in rpc queue --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index c3dfc1a64c..99d641ff3f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -209,7 +209,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { pRpc->info.wrapper = pWrapper; - EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg should not be restricted by tsRpcQueueMemoryUsed + EQItype itype = IsReq(pRpc) ? RPC_QITEM : DEF_QITEM; // rsp msg is not restricted by tsRpcQueueMemoryUsed pMsg = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen); if (pMsg == NULL) goto _OVER; From 2b9df7b45ce99cc54a8b6f43c6b4ce965199c397 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 4 Jul 2024 18:36:17 +0800 Subject: [PATCH 4/4] fix: oom in rpc queue --- source/util/src/tqueue.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index aa8834c89f..45a8a462fb 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); if (alloced > tsRpcQueueMemoryAllowed) { uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, - tsRpcQueueMemoryUsed); + tsRpcQueueMemoryAllowed); atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); taosMemoryFree(pNode); terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE;