From 7ad96ac276424b3dc49be61f563b22756f5ef96d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 14 May 2022 11:53:46 +0800 Subject: [PATCH] enh: control the memory of the rpc queue --- include/common/tglobal.h | 1 - include/util/tprocess.h | 3 +- include/util/tqueue.h | 12 ++- source/client/src/tmq.c | 10 +-- source/common/src/tglobal.c | 2 - source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/libs/executor/src/dataDispatcher.c | 2 +- source/libs/sync/src/syncIO.c | 6 +- source/libs/transport/test/pushServer.c | 2 +- source/libs/transport/test/rserver.c | 2 +- source/util/src/tprocess.c | 4 +- source/util/src/tqueue.c | 74 +++++++++++-------- 15 files changed, 70 insertions(+), 56 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 94b6ca551b..83283cfdad 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -69,7 +69,6 @@ extern int32_t tsNumOfQnodeFetchThreads; extern int32_t tsNumOfSnodeSharedThreads; extern int32_t tsNumOfSnodeUniqueThreads; extern int64_t tsRpcQueueMemoryAllowed; -extern int64_t tsRpcQueueMemoryUsed; // monitor extern bool tsEnableMonitor; diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 7e1767441e..5e5a982ec4 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -17,6 +17,7 @@ #define _TD_UTIL_PROCESS_H_ #include "os.h" +#include "tqueue.h" #ifdef __cplusplus extern "C" { @@ -25,7 +26,7 @@ extern "C" { typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType; typedef struct SProcObj SProcObj; -typedef void *(*ProcMallocFp)(int32_t contLen); +typedef void *(*ProcMallocFp)(int32_t contLen, EQItype itype); typedef void *(*ProcFreeFp)(void *pCont); typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype); diff --git a/include/util/tqueue.h b/include/util/tqueue.h index fc02ea2491..dbc4d03177 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -48,18 +48,24 @@ typedef struct { int32_t threadNum; } SQueueInfo; +typedef enum { + DEF_QITEM = 0, + RPC_QITEM = 1, +} EQItype; + typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems); STaosQueue *taosOpenQueue(); void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); -void *taosAllocateQitem(int32_t size); +void *taosAllocateQitem(int32_t size, EQItype itype); void taosFreeQitem(void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); int32_t taosQueueItemSize(STaosQueue *queue); +int64_t taosQueueMemorySize(STaosQueue *queue); STaosQall *taosAllocateQall(); void taosFreeQall(STaosQall *qall); @@ -77,8 +83,8 @@ int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); void taosResetQsetThread(STaosQset *qset, void *pItem); -int32_t taosGetQueueItemsNumber(STaosQueue *queue); -int32_t taosGetQsetItemsNumber(STaosQset *qset); + +extern int64_t tsRpcQueueMemoryAllowed; #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 639f00ab86..2c3f64c5c1 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -316,7 +316,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { void tmqAssignDelayedHbTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); *pTaskType = TMQ_DELAYED_TASK__HB; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -324,7 +324,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) { void tmqAssignDelayedCommitTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); *pTaskType = TMQ_DELAYED_TASK__COMMIT; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -332,7 +332,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { void tmqAssignDelayedReportTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; - int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t)); + int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); *pTaskType = TMQ_DELAYED_TASK__REPORT; taosWriteQitem(tmq->delayedTask, pTaskType); tsem_post(&tmq->rspSem); @@ -848,7 +848,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); } - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper)); + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); goto CREATE_MSG_FAIL; @@ -987,7 +987,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tmqUpdateEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { - SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); + SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); if (pWrapper == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 338a406eec..025ad0ca0d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -61,8 +61,6 @@ int32_t tsNumOfQnodeQueryThreads = 2; int32_t tsNumOfQnodeFetchThreads = 2; int32_t tsNumOfSnodeSharedThreads = 2; int32_t tsNumOfSnodeUniqueThreads = 2; -int64_t tsRpcQueueMemoryAllowed = 0; -int64_t tsRpcQueueMemoryUsed = 0; // monitor bool tsEnableMonitor = true; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index f55e02e50a..622b8332fc 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -104,7 +104,7 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { } static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); if (pMsg == NULL) return -1; dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 40b91cb93e..daac7f80bb 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -102,7 +102,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { } static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); if (pMsg == NULL) { return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 1f671179b6..d6e99c0899 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -326,7 +326,7 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); int32_t code = 0; if (pMsg != NULL) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 30ec04d9ef..9f350409c8 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -79,7 +79,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe needRelease = true; if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER; - if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER; + if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER; if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER; if (pWrapper->procType != DND_PROC_PARENT) { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 1d371d43f6..fa9e27a5f8 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -124,7 +124,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; - SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); + SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index deb158cbae..a3d1717c3b 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -92,7 +92,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg); SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); STaosQueue *pMsgQ = queue; @@ -360,7 +360,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { syncRpcMsgLog2((char *)"==syncIOProcessRequest==", pMsg); SSyncIO *io = pParent; SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); taosWriteQitem(io->pMsgQ, pTemp); } @@ -420,7 +420,7 @@ static void syncIOTickQ(void *param, void *tmrId) { SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsg, &rpcMsg); SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); syncRpcMsgLog2((char *)"==syncIOTickQ==", &rpcMsg); taosWriteQitem(io->pMsgQ, pTemp); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index 3099998f57..d8d78c2842 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -114,7 +114,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 14d109dc5a..80785340d1 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -103,7 +103,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 8963a4f94e..8b4fd235fd 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -258,8 +258,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea int16_t headLen = CEIL8(rawHeadLen); int32_t bodyLen = CEIL8(rawBodyLen); - void *pHead = (*mallocHeadFp)(headLen); - void *pBody = (*mallocBodyFp)(bodyLen); + void *pHead = (*mallocHeadFp)(headLen, RPC_QITEM); + void *pBody = (*mallocBodyFp)(bodyLen, RPC_QITEM); if (pHead == NULL || pBody == NULL) { taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index eca529c632..17a87b1a5f 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -18,20 +18,21 @@ #include "taoserror.h" #include "tlog.h" +int64_t tsRpcQueueMemoryAllowed = 0; +int64_t tsRpcQueueMemoryUsed = 0; + typedef struct STaosQnode STaosQnode; typedef struct STaosQnode { STaosQnode *next; STaosQueue *queue; int32_t size; - int32_t reserved; + int8_t itype; + int8_t reserved[3]; char item[]; } STaosQnode; typedef struct STaosQueue { - int64_t memOfItems; - int32_t numOfItems; - int32_t threadId; STaosQnode *head; STaosQnode *tail; STaosQueue *next; // for queue set @@ -40,15 +41,17 @@ typedef struct STaosQueue { FItem itemFp; FItems itemsFp; TdThreadMutex mutex; + int64_t memOfItems; + int32_t numOfItems; } STaosQueue; typedef struct STaosQset { STaosQueue *head; STaosQueue *current; TdThreadMutex mutex; + tsem_t sem; int32_t numOfQueues; int32_t numOfItems; - tsem_t sem; } STaosQset; typedef struct STaosQall { @@ -120,6 +123,8 @@ bool taosQueueEmpty(STaosQueue *queue) { } int32_t taosQueueItemSize(STaosQueue *queue) { + if (queue == NULL) return 0; + taosThreadMutexLock(&queue->mutex); int32_t numOfItems = queue->numOfItems; taosThreadMutexUnlock(&queue->mutex); @@ -127,32 +132,51 @@ int32_t taosQueueItemSize(STaosQueue *queue) { } int64_t taosQueueMemorySize(STaosQueue *queue) { + if (queue == NULL) return 0; + taosThreadMutexLock(&queue->mutex); int64_t memOfItems = queue->memOfItems; taosThreadMutexUnlock(&queue->mutex); return memOfItems; } -void *taosAllocateQitem(int32_t size) { +void *taosAllocateQitem(int32_t size, EQItype itype) { STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); pNode->size = size; + pNode->itype = itype; if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - uTrace("item:%p, node:%p is allocated", pNode->item, pNode); + if (itype == RPC_QITEM) { + int64_t alloced = atomic_add_fetch_64(&tsRpcQueueMemoryUsed, size); + if (alloced > tsRpcQueueMemoryUsed) { + taosMemoryFree(pNode); + terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE; + return NULL; + } + uTrace("item:%p, node:%p is allocated, alloc:%" PRId64, pNode->item, pNode, alloced); + } else { + uTrace("item:%p, node:%p is allocated", pNode->item, pNode); + } + return (void *)pNode->item; } void taosFreeQitem(void *pItem) { if (pItem == NULL) return; - char *temp = pItem; - temp -= sizeof(STaosQnode); - uTrace("item:%p, node:%p is freed", pItem, temp); - taosMemoryFree(temp); + STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); + if (pNode->itype > 0) { + int64_t alloced = atomic_sub_fetch_64(&tsRpcQueueMemoryUsed, pNode->size); + uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); + } else { + uTrace("item:%p, node:%p is freed", pItem, pNode); + } + + taosMemoryFree(pNode); } void taosWriteQitem(STaosQueue *queue, void *pItem) { @@ -203,7 +227,13 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) { return code; } -STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); } +STaosQall *taosAllocateQall() { + STaosQall *qall = taosMemoryCalloc(1, sizeof(STaosQall)); + if (qall != NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + return qall; +} void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); } @@ -458,23 +488,3 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) { } taosThreadMutexUnlock(&qset->mutex); } - -int32_t taosGetQueueItemsNumber(STaosQueue *queue) { - if (!queue) return 0; - - int32_t num; - taosThreadMutexLock(&queue->mutex); - num = queue->numOfItems; - taosThreadMutexUnlock(&queue->mutex); - return num; -} - -int32_t taosGetQsetItemsNumber(STaosQset *qset) { - if (!qset) return 0; - - int32_t num = 0; - taosThreadMutexLock(&qset->mutex); - num = qset->numOfItems; - taosThreadMutexUnlock(&qset->mutex); - return num; -}