From dd70c63dfe1d320814c8e35983c50cd2a256c653 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 23 Jan 2024 16:34:14 +0800 Subject: [PATCH] feat: table merge scan with one less disk usage --- source/libs/executor/inc/executorInt.h | 32 ++++ source/libs/executor/src/scanoperator.c | 235 ++++++++++++++++++++++++ 2 files changed, 267 insertions(+) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 64c14456b6..b20e1a98c0 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -42,6 +42,7 @@ extern "C" { // #include "tstream.h" // #include "tstreamUpdate.h" #include "tlrucache.h" +#include "tdatablock.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); @@ -284,6 +285,34 @@ typedef struct STableScanInfo { bool needCountEmptyTable; } STableScanInfo; +typedef enum ESubTableInputType { + SUB_TABLE_MEM_BLOCK, + SUB_TABLE_EXT_PAGES, +} ESubTableInputType; + +typedef struct STmsSubTableInput { + STsdbReader* pReader; + ESubTableInputType type; + SSDataBlock* pBlock; + SArray* aBlockPages; + + int32_t pageIdx; + int32_t rowIdx; + int64_t* aTs; +} STmsSubTableInput; + +typedef struct STmsSubTablesMergeInfo { + SBlockOrderInfo* pOrderInfo; + + int32_t numSubTables; + STmsSubTableInput* aInputs; + SMultiwayMergeTreeInfo* pTree; + int32_t numTablesCompleted; + + int32_t numTableBlocksInMem; + SDiskbasedBuf* pBlocksBuf; +} STmsSubTablesMergeInfo; + typedef struct STableMergeScanInfo { int32_t tableStartIndex; int32_t tableEndIndex; @@ -318,6 +347,9 @@ typedef struct STableMergeScanInfo { SSDataBlock* nextDurationBlocks[2]; bool rtnNextDurationBlocks; int32_t nextDurationBlocksIdx; + + + STmsSubTablesMergeInfo* pSubTablesMergeInfo; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 09f4f6ac3c..87749d3c81 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3311,6 +3311,241 @@ _error: return NULL; } +// table merge scan operator + +// table merge scan operator +// TODO: limit / duration optimization +// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish +// TODO: error processing, memory freeing +// TODO: add log for error and perf + +static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) { + int32_t left = *(int32_t*)pLeft; + int32_t right = *(int32_t*)pRight; + STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param; + + int32_t leftIdx = pInfo->aInputs[left].rowIdx; + int32_t rightIdx = pInfo->aInputs[right].rowIdx; + + if (leftIdx == -1) { + return 1; + } else if (rightIdx == -1) { + return -1; + } + + int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx]; + int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx]; + int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0); + if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) { + ret = -1 * ret; + } + return ret; +} + +static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pHasNext) { + const STableMergeScanInfo* pInfo = pOperator->info; + STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const SStorageAPI* pAPI= &pTaskInfo->storageAPI; + + blockDataCleanup(pInput->pBlock); + bool hasNext = false; + int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInput->pReader, &hasNext); + *pHasNext = hasNext; + + if (!hasNext) { + pInput->rowIdx = -1; + ++pSubTblsInfo->numTablesCompleted; + return code; + } + uint32_t status = 0; + code = loadDataBlock(pOperator, &pInfo->base, pInput->pBlock, &status); + + return code; +} + +static int32_t saveSubTableBlock(const STableMergeScanInfo* pInfo, STmsSubTablesMergeInfo* pSubTblsInfo, + STmsSubTableInput* pInput) { + pInput->aBlockPages = taosArrayInit(4, sizeof(int32_t)); + int32_t start = 0; + SSDataBlock* pDataBlock = pInput->pBlock; + while (start < pDataBlock->info.rows) { + int32_t stop = 0; + blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize); + SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); + + int32_t pageId = -1; + void* pPage = getNewBufPage(pSubTblsInfo->pBlocksBuf, &pageId); + + taosArrayPush(pInput->aBlockPages, &pageId); + + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); + ASSERT(size <= getBufPageSize(pSubTblsInfo->pBlocksBuf)); + + blockDataToBuf(pPage, p); + + setBufPageDirty(pPage, true); + releaseBufPage(pSubTblsInfo->pBlocksBuf, pPage); + + blockDataDestroy(p); + start = stop + 1; + } + blockDataCleanup(pInput->pBlock); + return TSDB_CODE_SUCCESS; +} + +static int32_t fetchNextSubTableBlockFromPage(STmsSubTablesMergeInfo* pSubTblsInfo, STmsSubTableInput* pInput) { + int32_t pageId = *(int32_t*)taosArrayGet(pInput->aBlockPages, pInput->pageIdx); + void* page = getBufPage(pSubTblsInfo->pBlocksBuf, pageId); + blockDataFromBuf(pInput->pBlock, page); + releaseBufPage(pSubTblsInfo->pBlocksBuf, page); + pInput->rowIdx = 0; + return TSDB_CODE_SUCCESS; +} + +static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) { + size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); + int32_t i = pInfo->tableStartIndex + 1; + for (; i < numOfTables; ++i) { + STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + if (tableKeyInfo->groupId != pInfo->groupId) { + break; + } + } + pInfo->tableEndIndex = i - 1; +} + +static int32_t initSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) { + for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) { + STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i; + if (pInput->rowIdx == -1) { + continue; + } + if (pInput->type == SUB_TABLE_EXT_PAGES) { + pInput->pageIdx = 0; + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + } else { + pInput->rowIdx = 0; + pInput->pageIdx = -1; + } + SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + pInput->aTs = (int64_t*)col->pData; + } + tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn); + return TSDB_CODE_SUCCESS; +} + +static int32_t startSubTablesTms(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SReadHandle* pHandle = &pInfo->base.readHandle; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + + setGroupStartEndIndex(pInfo); + + pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0); + pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1; + pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput)); + int32_t bufPageSize = pInfo->bufPageSize; + int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize; + createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir); + for (int32_t i = pInfo->tableStartIndex; i <= pInfo->tableEndIndex; ++i) { + int32_t idx = i - pInfo->tableStartIndex; + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + idx; + pInput->type = SUB_TABLE_MEM_BLOCK; + pInput->pBlock = createOneDataBlock(pInfo->pResBlock, false); + STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); + pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, keyInfo, 1, pInfo->pReaderBlock, + (void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL); + bool hasNext = true; + fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext); + + if (idx + 1 > pSubTblsInfo->numTableBlocksInMem && hasNext) { + saveSubTableBlock(pInfo, pSubTblsInfo, pInput); + pInput->type = SUB_TABLE_EXT_PAGES; + } + } + + initSubTablesMergeSort(pSubTblsInfo); + + return TSDB_CODE_SUCCESS; +} + +static int32_t stopSubTablesTms(SOperatorInfo* pOperator) { + STableMergeScanInfo* pInfo = pOperator->info; + STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo; + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SReadHandle* pHandle = &pInfo->base.readHandle; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + tMergeTreeDestroy(pSubTblsInfo->pTree); + for (int32_t i = 0; i <= pSubTblsInfo->numSubTables; ++i) { + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i; + + blockDataDestroy(pInput->pBlock); + taosArrayDestroy(pInput->aBlockPages); + pAPI->tsdReader.tsdReaderClose(pInput->pReader); + } + destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf); + taosMemoryFree(pSubTblsInfo->aInputs); + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo, int32_t *pNumCompleted) { + STableMergeScanInfo* pInfo = pOperatorInfo->info; + + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); + if (pInput->rowIdx < pInput->pBlock->info.rows - 1) { + ++pInput->rowIdx; + return TSDB_CODE_SUCCESS; + } else if (pInput->rowIdx == pInput->pBlock->info.rows -1 ) { + if (pInput->type == SUB_TABLE_MEM_BLOCK) { + bool hasNext = true; + fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); + } else if (pInput->type == SUB_TABLE_EXT_PAGES) { + ++pInput->pageIdx; + if (pInput->pageIdx >= taosArrayGetSize(pInput->aBlockPages)) { + bool hasNext = true; + fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext); + if (hasNext) { + saveSubTableBlock(pInfo, pSubTblsInfo, pInput); + pInput->pageIdx = 0; + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + } + } else { + fetchNextSubTableBlockFromPage(pSubTblsInfo, pInput); + } + } + SColumnInfoData* col = taosArrayGet(pInput->pBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId); + pInput->aTs = (int64_t*)col->pData; + } + tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree)); + return TSDB_CODE_SUCCESS; +} + +static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) { + STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree); + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SColumnInfoData* pSrcColInfo = taosArrayGet(pInput->pBlock->pDataBlock, i); + bool isNull = colDataIsNull(pSrcColInfo, pInput->pBlock->info.rows, pInput->rowIdx, NULL); + + if (isNull) { + colDataSetVal(pColInfo, pBlock->info.rows, NULL, true); + } else { + if (!pSrcColInfo->pData) continue; + char* pData = colDataGetData(pSrcColInfo, pInput->rowIdx); + colDataSetVal(pColInfo, pBlock->info.rows, pData, false); + } + } + + pBlock->info.rows += 1; + return TSDB_CODE_SUCCESS; +} + static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { int64_t nRows = 0; void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));