feat: use disk based buf for src block storage

This commit is contained in:
slzhou 2023-12-10 22:25:46 +08:00
parent d01e82439e
commit c705a71bd9
2 changed files with 40 additions and 76 deletions

View File

@ -274,12 +274,7 @@ typedef struct STableScanInfo {
} STableScanInfo;
typedef struct STmsSortRowIdInfo {
int32_t blkId;
int64_t dataFileOffset;
TdFilePtr idxFile;
char idxPath[PATH_MAX];
TdFilePtr dataFile;
char dataPath[PATH_MAX];
SDiskbasedBuf* pExtSrcBlkBuf;
SSHashObj* pBlkDataHash; // blkId->SSDataBlock*
} STmsSortRowIdInfo;

View File

@ -3220,36 +3220,8 @@ _error:
}
// ========================= table merge scan
typedef struct STmsSortBlockInfo {
int32_t blkId;
int32_t length;
int64_t offset;
} STmsSortBlockInfo;
static int32_t saveSourceBlock(STmsSortRowIdInfo* pSortInfo, const SSDataBlock* pSrcBlock, int32_t *pSzBlk) {
int32_t szBlk = blockDataGetSize(pSrcBlock) + sizeof(int32_t) + taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(int32_t);
char* buf = taosMemoryMalloc(szBlk);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
blockDataToBuf(buf, pSrcBlock);
taosLSeekFile(pSortInfo->dataFile, pSortInfo->dataFileOffset, SEEK_SET);
taosWriteFile(pSortInfo->dataFile, buf, szBlk);
taosMemoryFree(buf);
STmsSortBlockInfo info = {.blkId = pSortInfo->blkId
, .offset = pSortInfo->dataFileOffset, .length = szBlk};
taosLSeekFile(pSortInfo->idxFile, pSortInfo->blkId*sizeof(STmsSortBlockInfo), SEEK_SET);
taosWriteFile(pSortInfo->idxFile, &info, sizeof(info));
*pSzBlk = szBlk;
return 0;
}
static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo,
const SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
const STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
int32_t nRows = pSrcBlock->info.rows;
pSortInputBlk->info.window = pSrcBlock->info.window;
@ -3262,32 +3234,41 @@ static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo,
colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info);
SColumnInfoData* blkIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1);
colDataSetNItems(blkIdCol, 0, (char*)&pSortInfo->blkId, nRows, false);
SColumnInfoData* rowIdxCol = taosArrayGet(pSortInputBlk->pDataBlock, 2);
for (int32_t i = 0; i < nRows; ++i) {
colDataSetInt32(rowIdxCol, i, &i);
int32_t start = 0;
while (start < pSrcBlock->info.rows) {
int32_t stop = 0;
blockDataSplitRows(pSrcBlock, pSrcBlock->info.hasVarCol, start, &stop, pInfo->bufPageSize);
SSDataBlock* p = blockDataExtractBlock(pSrcBlock, start, stop-start+1);
int32_t pageId = -1;
void* pPage = getNewBufPage(pSortInfo->pExtSrcBlkBuf, &pageId);
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + taosArrayGetSize(p->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pSortInfo->pExtSrcBlkBuf));
blockDataToBuf(pPage, p);
setBufPageDirty(pPage, true);
releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage);
blockDataDestroy(p);
uInfo("sort input block pageId %d start %d, stop %d", pageId, start, stop);
colDataSetNItems(blkIdCol, start, (char*)&pageId, stop-start+1, false);
for (int32_t i = start; i <= stop; ++i) {
int32_t rowIdx = i - start;
colDataSetInt32(rowIdxCol, i, &rowIdx);
}
start = stop + 1;
}
pSortInputBlk->info.rows = nRows;
return 0;
}
static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
//TODO: batch save
int32_t code = 0;
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
int32_t szBlk = 0;
code = saveSourceBlock(pSortInfo, pSrcBlock, &szBlk);
fillSortInputBlock(pInfo, pSrcBlock, pSortInputBlk);
++pSortInfo->blkId;
pSortInfo->dataFileOffset = ((pSortInfo->dataFileOffset + szBlk) + 4096) & ~4096;
return code;
}
static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
@ -3297,16 +3278,10 @@ static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId,
*ppBlock = *(SSDataBlock**)pBlkHashVal;
}
else {
STmsSortBlockInfo blkInfo = {0};
taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET);
taosReadFile(pSortInfo->idxFile, &blkInfo, sizeof(STmsSortBlockInfo));
taosLSeekFile(pSortInfo->dataFile, blkInfo.offset, SEEK_SET);
char* buf = taosMemoryMalloc(blkInfo.length);
taosReadFile(pSortInfo->dataFile, buf, blkInfo.length);
void* pPage = getBufPage(pSortInfo->pExtSrcBlkBuf, blockId);
SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false);
blockDataFromBuf(pBlock, buf);
taosMemoryFree(buf);
blockDataFromBuf(pBlock, pPage);
releaseBufPage(pSortInfo->pExtSrcBlkBuf, pPage);
*ppBlock = pBlock;
tSimpleHashPut(pSortInfo->pBlkDataHash, &blockId, sizeof(blockId), &pBlock, sizeof(pBlock));
@ -3318,6 +3293,7 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc
int32_t blkId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t rowIdx = *(int32_t*)tsortGetValue(pTupleHandle, 2);
SSDataBlock* pSrcBlk = NULL;
uInfo("sort tuple blkId %d, row idx %d", blkId, rowIdx);
retrieveSourceBlock(pInfo, blkId, &pSrcBlk);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
@ -3328,7 +3304,7 @@ void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBloc
if (isNull) {
colDataSetNULL(pColInfo, pBlock->info.rows);
} else {
char* pData = colDataGetData(pSrcColInfo, i);
char* pData = colDataGetData(pSrcColInfo, rowIdx);
if (pData != NULL) {
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
}
@ -3488,23 +3464,16 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo;
pSort->blkId = 0;
pSort->dataFileOffset = 0;
taosGetTmpfilePath(tsTempDir, "tms-block-info", pSort->idxPath);
pSort->idxFile = taosOpenFile(pSort->idxPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
taosGetTmpfilePath(tsTempDir, "tms-block-data", pSort->dataPath);
pSort->dataFile = taosOpenFile(pSort->dataPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
createDiskbasedBuf(&pSort->pExtSrcBlkBuf, pInfo->bufPageSize, pInfo->sortBufSize, "tms-ext-src-block", tsTempDir);
dBufSetPrintInfo(pSort->pExtSrcBlkBuf);
pSort->pBlkDataHash = tSimpleHashInit(2048, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
return 0;
}
int32_t stopRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo;
taosCloseFile(&pSort->idxFile);
taosRemoveFile(pSort->idxPath);
taosCloseFile(&pSort->dataFile);
taosRemoveFile(pSort->dataPath);
destroyDiskbasedBuf(pSort->pExtSrcBlkBuf);
tSimpleHashCleanup(pSort->pBlkDataHash);
return 0;
}