From 50b66df39f9348191be34ee378b94d90e415896a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 10 Dec 2022 14:02:57 +0800 Subject: [PATCH 1/2] fix: control rpc qitem memory --- include/util/tqueue.h | 3 ++- source/client/src/clientTmq.c | 12 ++++++------ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 2 +- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/executor/src/dataDeleter.c | 2 +- source/libs/executor/src/dataDispatcher.c | 2 +- source/libs/stream/src/stream.c | 6 +++--- source/libs/stream/src/streamData.c | 6 +++--- source/libs/stream/src/streamExec.c | 4 ++-- source/libs/sync/test/sync_test_lib/src/syncIO.c | 6 +++--- source/libs/transport/test/svrBench.c | 2 +- source/util/src/tqueue.c | 10 ++++++---- 17 files changed, 36 insertions(+), 33 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 8b46bbd064..25a0019106 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -65,6 +65,7 @@ typedef struct STaosQnode { STaosQnode *next; STaosQueue *queue; int64_t timestamp; + int32_t dataSize; int32_t size; int8_t itype; int8_t reserved[3]; @@ -103,7 +104,7 @@ typedef struct STaosQall { STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); -void *taosAllocateQitem(int32_t size, EQItype itype); +void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize); void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index db717a4e4e..f82e54515f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -691,7 +691,7 @@ void tmqAssignAskEpTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); *pTaskType = TMQ_DELAYED_TASK__ASK_EP; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -703,7 +703,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); *pTaskType = TMQ_DELAYED_TASK__COMMIT; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -715,7 +715,7 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) { int64_t refId = *(int64_t*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq != NULL) { - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); *pTaskType = TMQ_DELAYED_TASK__REPORT; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -1140,7 +1140,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { goto CREATE_MSG_FAIL; } if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch); goto CREATE_MSG_FAIL; @@ -1173,7 +1173,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -1363,7 +1363,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { tmqUpdateEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { - SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); + SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); if (pWrapper == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 857fbcbce5..5d86e11c57 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -159,7 +159,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } if (pWorker == NULL) return -1; - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 3e5ad65db7..579a4e2090 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -58,7 +58,7 @@ int32_t qmPutNodeMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 2e66aeae37..e1cc378396 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -130,7 +130,7 @@ void smStopWorker(SSnodeMgmt *pMgmt) { } int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 08ea880b97..c48216ed7d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -233,7 +233,7 @@ int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) { rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 99f8bd002a..33646d6d67 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -141,7 +141,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } pRpc->info.wrapper = pWrapper; - pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); + pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM, pRpc->contLen); if (pMsg == NULL) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6bd2ae3435..ba99e5515d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -763,7 +763,7 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu tb_uid_t suid) { const SSubmitReq *pReq = (const SSubmitReq *)pMsg; - void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM); + void *qItem = taosAllocateQitem(pReq->header.contLen, DEF_QITEM, 0); if (!qItem) { return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 191fc1b49c..af3137f7b9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1265,7 +1265,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); if (!failed) { - SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM); + SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); pRefBlock->type = STREAM_INPUT__REF_DATA_BLOCK; pRefBlock->pBlock = pDelBlock; pRefBlock->dataRef = pRef; @@ -1297,7 +1297,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } #if 0 - SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pStreamBlock = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); pStreamBlock->type = STREAM_INPUT__DATA_BLOCK; pStreamBlock->blocks = taosArrayInit(0, sizeof(SSDataBlock)); SSDataBlock block = {0}; diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 12fb449238..eff7a5ef93 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -133,7 +133,7 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM); + SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM, 0); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index c45226e02b..c2fa438c80 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -126,7 +126,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM); + SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 79549675a3..5b542dd54b 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -56,7 +56,7 @@ void streamSchedByTimer(void* param, void* tmrId) { } if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { - SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); + SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0); if (trigger == NULL) return; trigger->type = STREAM_INPUT__GET_RES; trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -112,7 +112,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { } int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status; // enqueue @@ -150,7 +150,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR } int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status = TASK_INPUT_STATUS__NORMAL; // enqueue diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 2fea5b9eca..6cc684dddf 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock } SStreamDataSubmit* streamDataSubmitNew(SSubmitReq* pReq) { - SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0); if (pDataSubmit == NULL) return NULL; pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); if (pDataSubmit->dataRef == NULL) goto FAIL; @@ -81,7 +81,7 @@ FAIL: } SStreamMergedSubmit* streamMergedSubmitNew() { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); if (pMerged == NULL) return NULL; pMerged->reqs = taosArrayInit(0, sizeof(void*)); pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); @@ -107,7 +107,7 @@ static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) } SStreamDataSubmit* streamSubmitRefClone(SStreamDataSubmit* pSubmit) { - SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, 0); if (pSubmitClone == NULL) { return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d20c2902f5..20608a6cf3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -127,7 +127,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { taosArrayDestroy(pRes); break; } - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -235,7 +235,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("stream task %d exec end", pTask->taskId); if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); streamFreeQitem(input); diff --git a/source/libs/sync/test/sync_test_lib/src/syncIO.c b/source/libs/sync/test/sync_test_lib/src/syncIO.c index 415fecd9f2..4b305f823f 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncIO.c +++ b/source/libs/sync/test/sync_test_lib/src/syncIO.c @@ -97,7 +97,7 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { syncRpcMsgLog2(logBuf, pMsg); SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); STaosQueue *pMsgQ = gSyncIO->pMsgQ; @@ -381,7 +381,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg); SSyncIO *io = pParent; SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); taosWriteQitem(io->pMsgQ, pTemp); } @@ -441,7 +441,7 @@ static void syncIOTickQ(void *param, void *tmrId) { SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg); taosWriteQitem(io->pMsgQ, pTemp); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 464559c1e0..4e2395b17b 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -128,7 +128,7 @@ void *processShellMsg(void *arg) { void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM, 0); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); int32_t idx = balance % multiQ->numOfThread; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 0f992184b5..1079dff0bf 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -109,20 +109,22 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { return memOfItems; } -void *taosAllocateQitem(int32_t size, EQItype itype) { +void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pNode->dataSize = dataSize; pNode->size = size; pNode->itype = itype; pNode->timestamp = taosGetTimestampUs(); if (itype == RPC_QITEM) { - int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size); + int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); if (alloced > tsRpcQueueMemoryAllowed) { + uError("failed to alloc qitem, size:%d alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, tsRpcQueueMemoryUsed); taosMemoryFree(pNode); terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; return NULL; @@ -139,8 +141,8 @@ void taosFreeQitem(void *pItem) { if (pItem == NULL) return; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); - if (pNode->itype > 0) { - int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size); + if (pNode->itype == RPC_QITEM) { + int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size + pNode->dataSize); uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); } else { uTrace("item:%p, node:%p is freed", pItem, pNode); From 652f51fec524eb93bd269f0996b6c06a3b660607 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 10 Dec 2022 15:09:55 +0800 Subject: [PATCH 2/2] fix: control rpc qitem memory --- include/util/tqueue.h | 4 ++-- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 4 ++-- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 4 ++-- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/util/src/tqueue.c | 6 ++++-- 7 files changed, 13 insertions(+), 11 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 25a0019106..1f6b205cdf 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -65,7 +65,7 @@ typedef struct STaosQnode { STaosQnode *next; STaosQueue *queue; int64_t timestamp; - int32_t dataSize; + int64_t dataSize; int32_t size; int8_t itype; int8_t reserved[3]; @@ -104,7 +104,7 @@ typedef struct STaosQall { STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); -void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize); +void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize); void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 5d86e11c57..095857825d 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -164,7 +164,7 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { memcpy(pMsg, pRpc, sizeof(SRpcMsg)); pRpc->pCont = NULL; - dTrace("msg:%p, is created and will put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); + dTrace("msg:%p, is created and will put into %s queue, type:%s len:%d", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType), pRpc->contLen); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 579a4e2090..28da0f9c5f 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -65,12 +65,12 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { switch (qtype) { case QUERY_QUEUE: - dTrace("msg:%p, is created and will put into qnode-query queue", pMsg); + dTrace("msg:%p, is created and will put into qnode-query queue, len:%d", pMsg, pRpc->contLen); taosWriteQitem(pMgmt->queryWorker.queue, pMsg); return 0; case READ_QUEUE: case FETCH_QUEUE: - dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg); + dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen); taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); return 0; default: diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index e1cc378396..9bd5be5201 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -139,8 +139,8 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SSnode *pSnode = pMgmt->pSnode; if (pSnode == NULL) { - dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d", pMsg, terrstr(), - TMSG_INFO(pMsg->msgType), qtype); + dError("msg:%p failed to put into snode queue since %s, type:%s qtype:%d len:%d", pMsg, terrstr(), + TMSG_INFO(pMsg->msgType), qtype, pRpc->contLen); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); pRpc->pCont = NULL; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index c48216ed7d..7e3915f3d1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -241,7 +241,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { } SMsgHead *pHead = pRpc->pCont; - dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); + dTrace("vgId:%d, msg:%p is created, type:%s len:%d", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType), pRpc->contLen); pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 33646d6d67..4fa09a46b7 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -145,7 +145,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pMsg == NULL) goto _OVER; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); + dGTrace("msg:%p, is created, type:%s handle:%p len:%d", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle, pRpc->contLen); code = dmProcessNodeMsg(pWrapper, pMsg); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 1079dff0bf..42b6358893 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -109,7 +109,7 @@ int64_t taosQueueMemorySize(STaosQueue *queue) { return memOfItems; } -void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) { +void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -124,7 +124,9 @@ void *taosAllocateQitem(int32_t size, EQItype itype, int32_t dataSize) { if (itype == RPC_QITEM) { int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); if (alloced > tsRpcQueueMemoryAllowed) { - uError("failed to alloc qitem, size:%d alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, tsRpcQueueMemoryUsed); + uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, + tsRpcQueueMemoryUsed); + atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, size + dataSize); taosMemoryFree(pNode); terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; return NULL;