From 08553b778892081b1189057146bd3193c7a53758 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 11:18:10 +0800 Subject: [PATCH 1/3] mem leak --- source/libs/executor/src/filloperator.c | 1 + source/libs/executor/src/streamtimewindowoperator.c | 3 +++ 2 files changed, 4 insertions(+) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4cf3b3239d..53fef3b7e3 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -838,6 +838,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); pFillSup->next.key = pFillSup->cur.key; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index a6326be62f..4ead0df159 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1535,6 +1535,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2861,6 +2862,7 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { @@ -2874,6 +2876,7 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); blockDataDestroy(pInfo->pCheckpointRes); taosMemoryFreeClear(param); From 957c7f872b690d7fe6a9fc3fd346d8f3f56ff3b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 11:43:18 +0800 Subject: [PATCH 2/3] fix(stream): update the agg task status check, when transferring state. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3d5e0a8f29..c58da7d3f9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -308,7 +308,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING); } else { - ASSERT(status == TASK_STATUS__SCAN_HISTORY); + ASSERT(status == TASK_STATUS__NORMAL); pStreamTask->status.taskStatus = TASK_STATUS__HALT; qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } From caf20d7b5d88a53c2cb40702f9667d7930e00348 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Aug 2023 17:16:34 +0800 Subject: [PATCH 3/3] fix(stream): release the stream task. --- source/dnode/vnode/src/tq/tqRestore.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cfef019e4e..7ac7d7606b 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -318,10 +318,12 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } taosThreadMutexLock(&pTask->lock); + pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pStreamMeta, pTask); continue; }