From 305d4a962aec7e4f43379a2a29dc4310bb3ed88f Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 24 Jan 2024 17:16:18 +0800 Subject: [PATCH] fix: page block and reader block for each child table input --- source/libs/executor/inc/executorInt.h | 4 +- source/libs/executor/src/scanoperator.c | 49 +++++++++++++++---------- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 013fe41f4e..f59f0b9c57 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -292,8 +292,10 @@ typedef enum ESubTableInputType { typedef struct STmsSubTableInput { STsdbReader* pReader; ESubTableInputType type; - SSDataBlock* pBlock; + SSDataBlock* pReaderBlock; + SArray* aBlockPages; + SSDataBlock* pPageBlock; int32_t pageIdx; int32_t rowIdx; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 34a70f28e4..54552e8289 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3319,6 +3319,7 @@ _error: // TODO: error processing, memory freeing // TODO: add log for error and perf // TODO: tsdb reader open/close dynamically +// TODO: blockdata deep cleanup static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) { int32_t left = *(int32_t*)pLeft; @@ -3349,7 +3350,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; const SStorageAPI* pAPI= &pTaskInfo->storageAPI; - blockDataCleanup(pInput->pBlock); + blockDataCleanup(pInput->pReaderBlock); pInfo->base.dataReader = pInput->pReader; @@ -3369,7 +3370,7 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu } uint32_t status = 0; - code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status); + code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status); if (code != 0) { pInfo->base.dataReader = NULL; T_LONG_JMP(pTaskInfo->env, code); @@ -3377,13 +3378,13 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) { break; } - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pBlock->info.rows == 0) { + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) { continue; } *pSubTableHasBlock = true; - pInput->pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pBlock->info.id.uid); - pOperator->resultInfo.totalRows += pInput->pBlock->info.rows; + pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid); + pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows; pInfo->base.dataReader = NULL; return TSDB_CODE_SUCCESS; } @@ -3394,9 +3395,9 @@ static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSu static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput) { - pInput->aBlockPages = taosArrayInit(4, sizeof(int32_t)); + taosArrayClear(pInput->aBlockPages); int32_t start = 0; - SSDataBlock* pDataBlock = pInput->pBlock; + SSDataBlock* pDataBlock = pInput->pReaderBlock; while (start < pDataBlock->info.rows) { int32_t stop = 0; blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize); @@ -3418,14 +3419,16 @@ static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTables blockDataDestroy(p); start = stop + 1; } - blockDataCleanup(pInput->pBlock); + blockDataCleanup(pInput->pReaderBlock); return TSDB_CODE_SUCCESS; } static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput, int32_t pageIndex) { int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pageIndex); void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); - blockDataFromBuf(pInput->pBlock, page); + + blockDataFromBuf(pInput->pPageBlock, page); + releaseBufPage(pSubTblsInfo->pBlocksBuf, page); return TSDB_CODE_SUCCESS; } @@ -3458,7 +3461,8 @@ static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { pInput->rowIdx = 0; pInput->pageIdx = -1; } - SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock; + SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); pInput->aTs = (int64_t*)col->pData; } tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn); @@ -3479,7 +3483,7 @@ static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) { int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); - pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables; + pSubTblsInfo->numTableBlocksInMem = 0; return TSDB_CODE_SUCCESS; } @@ -3493,9 +3497,10 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; pInput->type = SUB_TABLE_MEM_BLOCK; - pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false); + pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false); + pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false); STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex); - pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pBlock, + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInput->pReaderBlock, (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); bool hasNext = true; fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); @@ -3508,6 +3513,7 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInput->pageIdx = -1; } if (i + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) { + pInput->aBlockPages = taosArrayInit(32, sizeof(int32_t)); saveSubTableBlock(pInfo, pSubTblsInfo, pInput); pInput->type = SUB_TABLE_EXT_PAGES; } @@ -3518,7 +3524,6 @@ static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) { STableMergeScanInfo* pInfo = pOperatorInfo->info; STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); - bool hasNext = true; fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); if (!hasNext) { @@ -3558,9 +3563,10 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab STableMergeScanInfo* pInfo = pOperatorInfo->info; STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); - if (pInput->rowIdx < pInput->pBlock->info.rows - 1) { + SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock; + if (pInput->rowIdx < pInputBlock->info.rows - 1) { ++pInput->rowIdx; - } else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) { + } else if (pInput->rowIdx == pInputBlock->info.rows -1 ) { if (pInput->type == SUB_TABLE_MEM_BLOCK) { adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo); } else if (pInput->type == SUB_TABLE_EXT_PAGES) { @@ -3568,7 +3574,7 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab adjustSubTableFromExtPages(pOperatorInfo, pSubTblsInfo); } if (pInput->rowIdx != -1) { - SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); pInput->aTs = (int64_t*)col->pData; } } @@ -3580,11 +3586,13 @@ static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTab static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); + SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock; + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pSrcColInfo = taosArrayGet(pInput->pBlock->pDataBlock, i); - bool isNull = colDataIsNull(pSrcColInfo, pInput->pBlock->info.rows, pInput->rowIdx, NULL); + SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i); + bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL); if (isNull) { colDataSetVal(pColInfo, pBlock->info.rows, NULL, true); @@ -3648,7 +3656,8 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) { for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; - blockDataDestroy(pInput->pBlock); + blockDataDestroy(pInput->pReaderBlock); + blockDataDestroy(pInput->pPageBlock); taosArrayDestroy(pInput->aBlockPages); pInfo->base.readerAPI.tsdReaderClose(pInput->pReader); pInput->pReader = NULL;