From 3b84053893589f976f97d2dda09a972ed520af6e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 23 Nov 2022 12:32:02 +0800 Subject: [PATCH 1/2] fix(stream): memory leak --- source/dnode/vnode/src/tq/tq.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5e35e34b87..e6f6f3ceaf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1269,6 +1269,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) { qError("stream task input del failed, task id %d", pTask->taskId); + atomic_sub_fetch_32(pRef, 1); taosFreeQitem(pRefBlock); continue; } @@ -1286,7 +1287,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); if (ref == 0) { - taosMemoryFree(pDelBlock); + blockDataDestroy(pDelBlock); taosMemoryFree(pRef); } From a02919495bca5bed1d583d42fa874c36075538d7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 24 Nov 2022 03:59:50 +0800 Subject: [PATCH 2/2] fix(stream): set tbname for parition --- source/libs/executor/src/groupoperator.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 26a5f6838d..245b21a75b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -906,6 +906,9 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { } else { pDest->info.parTbName[0] = 0; } + if (pParInfo->groupId && pDest->info.parTbName[0]) { + streamStatePutParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, pDest->info.parTbName); + } /*printf("\n\n set name %s\n\n", pDest->info.parTbName);*/ blockDataDestroy(pTmpBlock); blockDataDestroy(pResBlock);