fix(stream): update the merge result check.

This commit is contained in:
Haojun Liao 2024-07-31 16:33:44 +08:00
parent 91dcf63933
commit 715b6428aa
2 changed files with 12 additions and 7 deletions

View File

@ -219,13 +219,17 @@ int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
(void) taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); void* px = taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
if (px == NULL) {
return terrno;
}
taosArrayDestroy(pBlockSrc->blocks); taosArrayDestroy(pBlockSrc->blocks);
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
taosFreeQitem(pElem); taosFreeQitem(pElem);
*pRes = dst; *pRes = dst;
return TSDB_CODE_SUCCESS; return code;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;

View File

@ -231,13 +231,14 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
if (*pInput == NULL) { if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0); ASSERT((*numOfBlocks) == 0);
*pInput = qItem; *pInput = qItem;
} else { } else { // merge current block failed, let's handle the already merged blocks.
// merge current block failed, let's handle the already merged blocks.
void* newRet = NULL; void* newRet = NULL;
int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet); int32_t code = streamQueueMergeQueueItem(*pInput, qItem, (SStreamQueueItem**)&newRet);
if (code != TSDB_CODE_SUCCESS) { if (newRet == NULL) {
if (code) {
stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, stError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks,
tstrerror(terrno)); tstrerror(code));
}
*blockSize = streamQueueItemGetSize(*pInput); *blockSize = streamQueueItemGetSize(*pInput);
if (taskLevel == TASK_LEVEL__SINK) { if (taskLevel == TASK_LEVEL__SINK) {