From 3f2b469b5ec0a086138c0c1935a8f9845892173d Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 24 Jan 2024 09:12:27 +0800 Subject: [PATCH] feat: refactor code --- source/libs/executor/src/scanoperator.c | 167 +++++++++++++++++------- 1 file changed, 122 insertions(+), 45 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0f87175fd2..0f1d48c840 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3342,26 +3342,47 @@ static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* p return ret; } -static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pHasNext) { +static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) { STableMergeScanInfo* pInfo = pOperator->info; STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; const SStorageAPI* pAPI= &pTaskInfo->storageAPI; blockDataCleanup(pInput->pBlock); - bool hasNext = false; - int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext); - *pHasNext = hasNext; - if (!hasNext) { - pInput->rowIdx = -1; - ++pSubTblsInfo->numTablesCompleted; - return code; + while (true) { + bool hasNext = false; + int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext); + if (code != 0) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader); + T_LONG_JMP(pTaskInfo->env, code); + } + if (!hasNext || isTaskKilled(pTaskInfo)) { + if (isTaskKilled(pTaskInfo)) { + pAPI->tsdReader.tsdReaderReleaseDataBlock(pInput->pReader); + } + break; + } + + uint32_t status = 0; + code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status); + if (code != 0) { + T_LONG_JMP(pTaskInfo->env, code); + } + if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { + break; + } + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) { + continue; + } + + *pSubTableHasBlock = true; + pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid); + pOperator->resultInfo.totalRows += pInput->pBlock->info.rows; + return TSDB_CODE_SUCCESS; } - uint32_t status = 0; - code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status); - - return code; + *pSubTableHasBlock = false; + return TSDB_CODE_SUCCESS; } static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo, @@ -3399,7 +3420,6 @@ static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsIn void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); blockDataFromBuf(pInput->pBlock, page); releaseBufPage(pSubTblsInfo->pBlocksBuf, page); - pInput->rowIdx = 0; return TSDB_CODE_SUCCESS; } @@ -3424,6 +3444,7 @@ static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { if (pInput->type == SUB_TABLE_EXT_PAGES) { pInput->pageIdx = 0; fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + pInput->rowIdx = 0; } else { pInput->rowIdx = 0; pInput->pageIdx = -1; @@ -3435,40 +3456,62 @@ static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { return TSDB_CODE_SUCCESS; } -static int32_t startSubTablesTms(SOperatorInfo* pOperator) { - STableMergeScanInfo* pInfo = pOperator->info; - STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SReadHandle* pHandle = &pInfo->base.readHandle; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; +static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { + STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo)); + pInfo->pSubTablesMergeInfo = pSubTblsInfo; setGroupStartEndIndex(pInfo); pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0); pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput)); + int32_t bufPageSize = pInfo->bufPageSize; int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); - for (int32_t i = pInfo->tableStartIndex; i <= pInfo->tableEndIndex; ++i) { - int32_t idx = i - pInfo->tableStartIndex; - STmsSubTableInput* pInput = pSubTblsInfo->aInputs + idx; + + pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables; + return TSDB_CODE_SUCCESS; +} + +static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SReadHandle* pHandle = &pInfo->base.readHandle; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + + for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; pInput->type = SUB_TABLE_MEM_BLOCK; pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false); - STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock, (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); bool hasNext = true; fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); - - if (idx + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) { + if (!hasNext) { + pInput->rowIdx = -1; + ++pSubTblsInfo->numTablesCompleted; + continue; + } else { + pInput->rowIdx = 0; + pInput->pageIdx = -1; + } + if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) { saveSubTableBlock(pInfo, pSubTblsInfo, pInput); pInput->type = SUB_TABLE_EXT_PAGES; } - } + } + return TSDB_CODE_SUCCESS; +} - initSubTablesMergeSort(pSubTblsInfo); +static int32_t startSubTablesTms(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + + initSubTablesMergeInfo(pInfo); + initSubTableInputs(pOperator, pInfo); + initSubTablesMergeSort(pInfo->pSubTablesMergeInfo); return TSDB_CODE_SUCCESS; } @@ -3490,38 +3533,72 @@ static int32_t stopSubTablesTms(SOperatorInfo* pOperator) { } destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); taosMemoryFree(pSubTblsInfo->aInputs); + + taosMemoryFree(pSubTblsInfo); + pInfo->pSubTablesMergeInfo = NULL; return TSDB_CODE_SUCCESS; } +static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { + STableMergeScanInfo* pInfo = pOperatorInfo->info; + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); + + bool hasNext = true; + fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); + if (!hasNext) { + pInput->rowIdx = -1; + ++pSubTblsInfo->numTablesCompleted; + } else { + pInput->rowIdx = 0; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSubTableFromExtPages(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { + STableMergeScanInfo* pInfo = pOperatorInfo->info; + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); + if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) { + bool hasNext = true; + fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); + if (!hasNext) { + pInput->rowIdx = -1; + ++pSubTblsInfo->numTablesCompleted; + } else { + saveSubTableBlock(pInfo, pSubTblsInfo, pInput); + pInput->pageIdx = 0; + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + pInput->rowIdx = 0; + + } + } else { + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + pInput->rowIdx = 0; + } + return TSDB_CODE_SUCCESS; +} + static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) { STableMergeScanInfo* pInfo = pOperatorInfo->info; - STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); + if (pInput->rowIdx < pInput->pBlock->info.rows - 1) { ++pInput->rowIdx; - return TSDB_CODE_SUCCESS; } else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) { if (pInput->type == SUB_TABLE_MEM_BLOCK) { - bool hasNext = true; - fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); + adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo); } else if (pInput->type == SUB_TABLE_EXT_PAGES) { ++pInput->pageIdx; - if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) { - bool hasNext = true; - fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); - if (hasNext) { - saveSubTableBlock(pInfo, pSubTblsInfo, pInput); - pInput->pageIdx = 0; - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); - } - } else { - fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); - } + adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo); + } + if (pInput->rowIdx != -1) { + SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + pInput->aTs = (int64_t*)col->pData; } - SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); - pInput->aTs = (int64_t*)col->pData; } + tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree)); + return TSDB_CODE_SUCCESS; }