From 1b2636028a5220d5f55231a297f5ae8417d5e3a0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 21 Jul 2023 23:05:42 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- include/libs/stream/tstream.h | 1 + source/dnode/snode/src/snode.c | 1 + source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/streamDispatch.c | 3 ++- source/libs/stream/src/streamTask.c | 5 +++++ 5 files changed, 10 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7a516190b..b169d82574 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -337,6 +337,7 @@ struct SStreamTask { SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend SArray* pRspMsgList; + TdThreadMutex lock; // the followings attributes don't be serialized int32_t notReadyTasks; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6288a048f7..8a7b61135b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -91,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); + taosThreadMutexInit(&pTask->lock, NULL); streamSetupScheduleTrigger(pTask); qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aeb9b55d12..5cae6793be 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -921,6 +921,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->status.taskStatus = TASK_STATUS__NORMAL; } + taosThreadMutexInit(&pTask->lock, NULL); streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a22a9ec534..88af841f05 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -691,10 +691,11 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); info.msg.info = *pRpcInfo; - // todo: fix race condition here + taosThreadMutexLock(&pTask->lock); if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); } + taosThreadMutexUnlock(&pTask->lock); taosArrayPush(pTask->pRspMsgList, &info); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ca4586a1b4..d54d5fa8b8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,5 +251,10 @@ void tFreeStreamTask(SStreamTask* pTask) { tSimpleHashCleanup(pTask->pNameMap); } + if (pTask->pRspMsgList != NULL) { + pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList); + } + + taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); }