diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e15708e357..ab33af6acf 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -76,6 +76,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n */ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); +/** + * @brief Cleanup SSDataBlock for StreamScanInfo + * + * @param tinfo + */ +void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); + /** * Update the table id list, add or remove. * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index d1c8327728..fd57441847 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1336,6 +1336,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); + tdCleanupStreamInputDataBlock(pItem->taskInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo); // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b2fe04f5a2..2d5bf357b0 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -83,6 +83,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayClear(p->pDataBlock); taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); + } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { @@ -93,6 +94,29 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + if (!pTaskInfo || !pTaskInfo->pRoot || pTaskInfo->pRoot->numOfDownstream <= 0) { + return; + } + SOperatorInfo* pOptrInfo = pTaskInfo->pRoot->pDownstream[0]; + + if (pOptrInfo->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = pOptrInfo->info; + if (pInfo->blockType = STREAM_INPUT__DATA_BLOCK) { + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBlockLists); ++i) { + SSDataBlock* p = *(SSDataBlock**)taosArrayGet(pInfo->pBlockLists, i); + taosArrayDestroy(p->pDataBlock); + taosMemoryFreeClear(p); + } + } else { + ASSERT(0); + } + } else { + ASSERT(0); + } +} + int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_QRY_APP_ERROR;