diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 89db5761c2..2dcb555834 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2829,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - tsortDestroySortHandle(pInfo->pSortHandle); + size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + + for (int32_t i = 0; i < numReaders; ++i) { + STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); + blockDataDestroy(param->inputBlock); + } taosArrayClear(pInfo->sortSourceParams); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) { + tsortDestroySortHandle(pInfo->pSortHandle); + + for (int32_t i = 0; i < numReaders; ++i) { STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); tsdbReaderClose(reader); } - taosArrayDestroy(pInfo->dataReaders); pInfo->dataReaders = NULL; return TSDB_CODE_SUCCESS; } -SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { - return NULL; - } - - blockDataEnsureCapacity(p, capacity); + blockDataCleanup(pResBlock); + blockDataEnsureCapacity(pResBlock, capacity); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -2859,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa break; } - appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { + appendOneRowToDataBlock(pResBlock, pTupleHandle); + if (pResBlock->info.rows >= capacity) { break; } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); - return (p->info.rows > 0) ? p : NULL; + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); + return (pResBlock->info.rows > 0) ? pResBlock : NULL; } SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { @@ -2895,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = NULL; while (pInfo->tableStartIndex < tableListSize) { - pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { pBlock->info.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -2919,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); + taosArrayDestroy(pTableScanInfo->sortSourceParams); for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i);