From 6a98b11beffc48cbcc175b89805877f30674a021 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 25 Sep 2023 23:10:33 +0800 Subject: [PATCH] fix(stream): fix invalid read and memory leak. --- source/dnode/vnode/src/tq/tqSink.c | 2 ++ source/libs/stream/src/streamData.c | 10 +++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 81c3d3d07d..9158702284 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -824,6 +824,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { bool hasSubmit = false; for (int32_t i = 0; i < numOfBlocks; i++) { if (streamTaskShouldStop(&pTask->status)) { + taosHashCleanup(pTableIndexMap); + tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE); return; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index a0fee6be8f..80927b36b9 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -165,17 +165,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); taosArrayDestroy(pBlockSrc->blocks); - taosFreeQitem(pElem); - streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); + + taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; streamMergeSubmit(pMerged, pBlockSrc); - taosFreeQitem(pElem); - streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); + + taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); @@ -188,9 +188,9 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); + taosFreeQitem(dst); taosFreeQitem(pElem); - return (SStreamQueueItem*)pMerged; } else { stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type);