From c705a71bd9eaf44feadf8b7137d3afb567315081 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sun, 10 Dec 2023 22:25:46 +0800 Subject: [PATCH] feat: use disk based buf for src block storage --- source/libs/executor/inc/executorInt.h | 7 +- source/libs/executor/src/scanoperator.c | 109 +++++++++--------------- 2 files changed, 40 insertions(+), 76 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 9a9338a6c7..7a9160b392 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -274,12 +274,7 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STmsSortRowIdInfo { - int32_t blkId; - int64_t dataFileOffset; - TdFilePtr idxFile; - char idxPath[PATH_MAX]; - TdFilePtr dataFile; - char dataPath[PATH_MAX]; + SDiskbasedBuf* pExtSrcBlkBuf; SSHashObj* pBlkDataHash; // blkId->SSDataBlock* } STmsSortRowIdInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 469d119523..6f2e19e694 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3220,36 +3220,8 @@ _error: } // ========================= table merge scan -typedef struct STmsSortBlockInfo { - int32_t blkId; - int32_t length; - int64_t offset; -} STmsSortBlockInfo; - -static int32_t saveSourceBlock(STmsSortRowIdInfo* pSortInfo, const SSDataBlock* pSrcBlock, int32_t *pSzBlk) { - int32_t szBlk = blockDataGetSize(pSrcBlock) + sizeof(int32_t) + taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(int32_t); - char* buf = taosMemoryMalloc(szBlk); - if (buf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - blockDataToBuf(buf, pSrcBlock); - taosLSeekFile(pSortInfo->dataFile, pSortInfo->dataFileOffset, SEEK_SET); - taosWriteFile(pSortInfo->dataFile, buf, szBlk); - taosMemoryFree(buf); - - STmsSortBlockInfo info = {.blkId = pSortInfo->blkId - , .offset = pSortInfo->dataFileOffset, .length = szBlk}; - taosLSeekFile(pSortInfo->idxFile, pSortInfo->blkId*sizeof(STmsSortBlockInfo), SEEK_SET); - taosWriteFile(pSortInfo->idxFile, &info, sizeof(info)); - - *pSzBlk = szBlk; - - return 0; -} - -static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo, - const SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { - const STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; +static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { + STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; int32_t nRows = pSrcBlock->info.rows; pSortInputBlk->info.window = pSrcBlock->info.window; @@ -3262,32 +3234,41 @@ static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo, colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info); SColumnInfoData* blkIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1); - colDataSetNItems(blkIdCol, 0, (char*)&pSortInfo->blkId, nRows, false); - SColumnInfoData* rowIdxCol = taosArrayGet(pSortInputBlk->pDataBlock, 2); - for (int32_t i = 0; i < nRows; ++i) { - colDataSetInt32(rowIdxCol, i, &i); + + int32_t start = 0; + while (start < pSrcBlock->info.rows) { + int32_t stop = 0; + blockDataSplitRows(pSrcBlock, pSrcBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize); + SSDataBlock* p = blockDataExtractBlock(pSrcBlock, start, stop-start+1); + + int32_t pageId = -1; + void* pPage = getNewBufPage(pSortInfo->pExtSrcBlkBuf, &pageId); + + int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t); + ASSERT(size <= getBufPageSize(pSortInfo->pExtSrcBlkBuf)); + + blockDataToBuf(pPage, p); + + setBufPageDirty(pPage, true); + releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage); + + blockDataDestroy(p); + uInfo("sort input block pageId %d start %d, stop %d", pageId, start, stop); + colDataSetNItems(blkIdCol, start, (char*)&pageId, stop-start+1, false); + + for (int32_t i = start; i <= stop; ++i) { + int32_t rowIdx = i - start; + colDataSetInt32(rowIdxCol, i, &rowIdx); + } + start = stop + 1; } pSortInputBlk->info.rows = nRows; + return 0; } -static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) { - //TODO: batch save - int32_t code = 0; - STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; - int32_t szBlk = 0; - code = saveSourceBlock(pSortInfo, pSrcBlock, &szBlk); - - fillSortInputBlock(pInfo, pSrcBlock, pSortInputBlk); - - ++pSortInfo->blkId; - pSortInfo->dataFileOffset = ((pSortInfo->dataFileOffset + szBlk) + 4096) & ~4096; - - return code; -} - static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) { STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo; @@ -3297,16 +3278,10 @@ static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, *ppBlock = *(SSDataBlock**)pBlkHashVal; } else { - STmsSortBlockInfo blkInfo = {0}; - - taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET); - taosReadFile(pSortInfo->idxFile, &blkInfo, sizeof(STmsSortBlockInfo)); - taosLSeekFile(pSortInfo->dataFile, blkInfo.offset, SEEK_SET); - char* buf = taosMemoryMalloc(blkInfo.length); - taosReadFile(pSortInfo->dataFile, buf, blkInfo.length); + void* pPage = getBufPage(pSortInfo->pExtSrcBlkBuf, blockId); SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false); - blockDataFromBuf(pBlock, buf); - taosMemoryFree(buf); + blockDataFromBuf(pBlock, pPage); + releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage); *ppBlock = pBlock; tSimpleHashPut(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId), &pBlock, sizeof(pBlock)); @@ -3318,8 +3293,9 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc int32_t blkId = *(int32_t*)tsortGetValue(pTupleHandle, 1); int32_t rowIdx = *(int32_t*)tsortGetValue(pTupleHandle, 2); SSDataBlock* pSrcBlk = NULL; + uInfo("sort tuple blkId %d, row idx %d", blkId, rowIdx); retrieveSourceBlock(pInfo, blkId, &pSrcBlk); - + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pSrcColInfo = taosArrayGet(pSrcBlk->pDataBlock, i); @@ -3328,7 +3304,7 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc if (isNull) { colDataSetNULL(pColInfo, pBlock->info.rows); } else { - char* pData = colDataGetData(pSrcColInfo, i); + char* pData = colDataGetData(pSrcColInfo, rowIdx); if (pData != NULL) { colDataSetVal(pColInfo, pBlock->info.rows, pData, false); } @@ -3488,23 +3464,16 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* int32_t startRowIdSort(STableMergeScanInfo *pInfo) { STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; - pSort->blkId = 0; - pSort->dataFileOffset = 0; - taosGetTmpfilePath(tsTempDir, "tms-block-info", pSort->idxPath); - pSort->idxFile = taosOpenFile(pSort->idxPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); - taosGetTmpfilePath(tsTempDir, "tms-block-data", pSort->dataPath); - pSort->dataFile = taosOpenFile(pSort->dataPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + createDiskbasedBuf(&pSort->pExtSrcBlkBuf, pInfo->bufPageSize, pInfo->sortBufSize, "tms-ext-src-block", tsTempDir); + dBufSetPrintInfo(pSort->pExtSrcBlkBuf); pSort->pBlkDataHash = tSimpleHashInit(2048, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); return 0; } int32_t stopRowIdSort(STableMergeScanInfo *pInfo) { STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo; - taosCloseFile(&pSort->idxFile); - taosRemoveFile(pSort->idxPath); - taosCloseFile(&pSort->dataFile); - taosRemoveFile(pSort->dataPath); + destroyDiskbasedBuf(pSort->pExtSrcBlkBuf); tSimpleHashCleanup(pSort->pBlkDataHash); return 0; }