From 715b6428aaae8a0d10b1e35cab8599f9a0a657ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 31 Jul 2024 16:33:44 +0800 Subject: [PATCH] fix(stream): update the merge result check. --- source/libs/stream/src/streamData.c | 8 ++++++-- source/libs/stream/src/streamQueue.c | 11 ++++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 00a62d4773..57e5322e38 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -219,13 +219,17 @@ int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; - (void) taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); + void* px = taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); + if (px == NULL) { + return terrno; + } + taosArrayDestroy(pBlockSrc->blocks); streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); taosFreeQitem(pElem); *pRes = dst; - return TSDB_CODE_SUCCESS; + return code; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 537062b04e..5e538c1e42 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -231,13 +231,14 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte if (*pInput == NULL) { ASSERT((*numOfBlocks) == 0); *pInput = qItem; - } else { - // merge current block failed, let's handle the already merged blocks. + } else { // merge current block failed, let's handle the already merged blocks. void* newRet = NULL; int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, - tstrerror(terrno)); + if (newRet == NULL) { + if (code) { + stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, + tstrerror(code)); + } *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) {