From 622dd10b195d8b9c123a9019e58b3b7801a0439e Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 24 Jan 2024 10:21:17 +0800 Subject: [PATCH] feat: add operator next function --- source/libs/executor/inc/executorInt.h | 4 +- source/libs/executor/src/scanoperator.c | 184 ++++++++++++++++++------ 2 files changed, 143 insertions(+), 45 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b20e1a98c0..b86ff2f7fa 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -42,7 +42,6 @@ extern "C" { // #include "tstream.h" // #include "tstreamUpdate.h" #include "tlrucache.h" -#include "tdatablock.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); @@ -301,13 +300,14 @@ typedef struct STmsSubTableInput { int64_t* aTs; } STmsSubTableInput; +typedef struct SBlockOrderInfo SBlockOrderInfo; typedef struct STmsSubTablesMergeInfo { SBlockOrderInfo* pOrderInfo; int32_t numSubTables; STmsSubTableInput* aInputs; SMultiwayMergeTreeInfo* pTree; - int32_t numTablesCompleted; + int32_t numSubTablesCompleted; int32_t numTableBlocksInMem; SDiskbasedBuf* pBlocksBuf; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0f1d48c840..d73b43ee3e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3375,7 +3375,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) { continue; } - + *pSubTableHasBlock = true; pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid); pOperator->resultInfo.totalRows += pInput->pBlock->info.rows; @@ -3415,8 +3415,8 @@ static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTables return TSDB_CODE_SUCCESS; } -static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput) { - int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pInput->pageIdx); +static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) { + int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex); void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); blockDataFromBuf(pInput->pBlock, page); releaseBufPage(pSubTblsInfo->pBlocksBuf, page); @@ -3424,6 +3424,8 @@ static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsIn } static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { + pInfo->bGroupProcessed = false; + size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { @@ -3443,7 +3445,7 @@ static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { } if (pInput->type == SUB_TABLE_EXT_PAGES) { pInput->pageIdx = 0; - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx); pInput->rowIdx = 0; } else { pInput->rowIdx = 0; @@ -3492,7 +3494,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); if (!hasNext) { pInput->rowIdx = -1; - ++pSubTblsInfo->numTablesCompleted; + ++pSubTblsInfo->numSubTablesCompleted; continue; } else { pInput->rowIdx = 0; @@ -3506,39 +3508,6 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* return TSDB_CODE_SUCCESS; } -static int32_t startSubTablesTms(SOperatorInfo* pOperator) { - STableMergeScanInfo* pInfo = pOperator->info; - - initSubTablesMergeInfo(pInfo); - initSubTableInputs(pOperator, pInfo); - initSubTablesMergeSort(pInfo->pSubTablesMergeInfo); - - return TSDB_CODE_SUCCESS; -} - -static int32_t stopSubTablesTms(SOperatorInfo* pOperator) { - STableMergeScanInfo* pInfo = pOperator->info; - STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SReadHandle* pHandle = &pInfo->base.readHandle; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; - tMergeTreeDestroy(&pSubTblsInfo->pTree); - for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) { - STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; - - blockDataDestroy(pInput->pBlock); - taosArrayDestroy(pInput->aBlockPages); - pAPI->tsdReader.tsdReaderClose(pInput->pReader); - } - destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); - taosMemoryFree(pSubTblsInfo->aInputs); - - taosMemoryFree(pSubTblsInfo); - pInfo->pSubTablesMergeInfo = NULL; - return TSDB_CODE_SUCCESS; -} - static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { STableMergeScanInfo* pInfo = pOperatorInfo->info; STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); @@ -3547,7 +3516,7 @@ static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubT fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); if (!hasNext) { pInput->rowIdx = -1; - ++pSubTblsInfo->numTablesCompleted; + ++pSubTblsInfo->numSubTablesCompleted; } else { pInput->rowIdx = 0; } @@ -3563,22 +3532,22 @@ static int32_t adjustSubTableFromExtPages(SOperatorInfo* pOperatorInfo, STmsSubT fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); if (!hasNext) { pInput->rowIdx = -1; - ++pSubTblsInfo->numTablesCompleted; + ++pSubTblsInfo->numSubTablesCompleted; } else { saveSubTableBlock(pInfo, pSubTblsInfo, pInput); pInput->pageIdx = 0; - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx); pInput->rowIdx = 0; } } else { - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput, pInput->pageIdx); pInput->rowIdx = 0; } return TSDB_CODE_SUCCESS; } -static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) { +static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { STableMergeScanInfo* pInfo = pOperatorInfo->info; STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); @@ -3623,6 +3592,135 @@ static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, return TSDB_CODE_SUCCESS; } +static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + + blockDataCleanup(pResBlock); + bool finished = false; + while (true) { + while (1) { + if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) { + finished = true; + break; + } + + appendChosenRowToDataBlock(pSubTblsInfo, pResBlock); + adjustSubTableForNextRow(pOperator, pSubTblsInfo); + + if (pResBlock->info.rows >= capacity) { + break; + } + } + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); + if (finished || limitReached || pResBlock->info.rows > 0) { + break; + } + } + return (pResBlock->info.rows > 0) ? pResBlock : NULL; +} + +static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + + initSubTablesMergeInfo(pInfo); + + initSubTableInputs(pOperator, pInfo); + + initSubTablesMergeSort(pInfo->pSubTablesMergeInfo); + + return TSDB_CODE_SUCCESS; +} + +static int32_t stopSubTablesTableMergeScan(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + + SReadHandle* pHandle = &pInfo->base.readHandle; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + tMergeTreeDestroy(&pSubTblsInfo->pTree); + + for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) { + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; + + blockDataDestroy(pInput->pBlock); + taosArrayDestroy(pInput->aBlockPages); + pAPI->tsdReader.tsdReaderClose(pInput->pReader); + } + + destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); + taosMemoryFree(pSubTblsInfo->aInputs); + + taosMemoryFree(pSubTblsInfo); + pInfo->pSubTablesMergeInfo = NULL; + return TSDB_CODE_SUCCESS; +} + +SSDataBlock* doTableMergeScanSubTables(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + int32_t code = pOperator->fpSet._openFn(pOperator); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo); + if (!pInfo->hasGroupId) { + pInfo->hasGroupId = true; + + if (tableListSize == 0) { + setOperatorCompleted(pOperator); + return NULL; + } + pInfo->tableStartIndex = 0; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; + startSubTablesTableMergeScan(pOperator); + } + + SSDataBlock* pBlock = NULL; + while (pInfo->tableStartIndex < tableListSize) { + if (isTaskKilled(pTaskInfo)) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + + pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity); + if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) { + STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex); + pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo); + } + if (pBlock != NULL) { + pBlock->info.id.groupId = pInfo->groupId; + pOperator->resultInfo.totalRows += pBlock->info.rows; + pInfo->bGroupProcessed = true; + return pBlock; + } else { + // Data of this group are all dumped, let's try the next group + stopSubTablesTableMergeScan(pOperator); + if (pInfo->tableEndIndex >= tableListSize - 1) { + setOperatorCompleted(pOperator); + break; + } + + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; + startSubTablesTableMergeScan(pOperator); + resetLimitInfoForNextGroup(&pInfo->limitInfo); + } + } + + return pBlock; +} + + +// table merge scan : one reader, save blocks to disk static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { int64_t nRows = 0; void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));