enhance: save block then sort by row id

This commit is contained in:
shenglian zhou 2023-12-08 15:57:29 +08:00
parent 1ba1fbfabc
commit 854766d986
2 changed files with 212 additions and 6 deletions

View File

@ -273,6 +273,17 @@ typedef struct STableScanInfo {
bool filesetDelimited;
} STableScanInfo;
typedef struct STmsSortRowIdInfo {
int32_t blkId;
int64_t dataFileOffset;
TdFilePtr idxFile;
char idxPath[PATH_MAX];
TdFilePtr dataFile;
char dataPath[PATH_MAX];
SLRUCache* pBlkInfoCache; // blkId->(offset, len)
SLRUCache* pBlkDataCache; // blkId->SSDataBlock*
} STmsSortRowIdInfo;
typedef struct STableMergeScanInfo {
int32_t tableStartIndex;
int32_t tableEndIndex;
@ -301,6 +312,8 @@ typedef struct STableMergeScanInfo {
bool bNewFileset;
bool bOnlyRetrieveBlock;
bool filesetDelimited;
bool bSortRowId;
STmsSortRowIdInfo tmsSortRowIdInfo;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {

View File

@ -3219,6 +3219,141 @@ _error:
return NULL;
}
// ========================= table merge scan
typedef struct STmsSortBlockInfo {
int32_t blkId;
int32_t length;
int64_t offset;
} STmsSortBlockInfo;
static int32_t saveSourceBlock(STmsSortRowIdInfo* pSortInfo, const SSDataBlock* pSrcBlock, int32_t *pSzBlk) {
int32_t szBlk = blockDataGetSize(pSrcBlock) + sizeof(int32_t) + taosArrayGetSize(pSrcBlock->pDataBlock) * sizeof(int32_t);
char* buf = taosMemoryMalloc(szBlk);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
blockDataToBuf(buf, pSrcBlock);
*pSzBlk = szBlk;
taosLSeekFile(pSortInfo->dataFile, pSortInfo->dataFileOffset, SEEK_SET);
taosWriteFile(pSortInfo->dataFile, buf, szBlk);
STmsSortBlockInfo info = {.blkId = pSortInfo->blkId
, .offset = pSortInfo->dataFileOffset, .length = szBlk};
taosLSeekFile(pSortInfo->idxFile, pSortInfo->blkId*sizeof(STmsSortBlockInfo), SEEK_SET);
taosWriteFile(pSortInfo->idxFile, &info, sizeof(info));
return 0;
}
static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo,
const SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
const STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
int32_t nRows = pSrcBlock->info.rows;
pSortInputBlk->info = pSrcBlock->info;
blockDataEnsureCapacity(pSortInputBlk, nRows);
int32_t tsSlotId = ((SBlockOrderInfo*)taosArrayGet(pInfo->pSortInfo, 0))->slotId;
SColumnInfoData* tsCol = taosArrayGet(pSortInputBlk->pDataBlock, 0);
SColumnInfoData* pSrcTsCol = taosArrayGet(pSrcBlock->pDataBlock, tsSlotId);
colDataAssign(tsCol, pSrcTsCol, nRows, &pSortInputBlk->info);
SColumnInfoData* blkIdCol = taosArrayGet(pSortInputBlk->pDataBlock, 1);
colDataSetNItems(blkIdCol, 0, (char*)&pSortInfo->blkId, nRows, false);
SColumnInfoData* rowIdxCol = taosArrayGet(pSortInputBlk->pDataBlock, 2);
for (int32_t i = 0; i < nRows; ++i) {
colDataSetInt32(rowIdxCol, i, &i);
}
return 0;
}
static int32_t transformIntoSortInputBlock(STableMergeScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pSortInputBlk) {
//TODO: batch save
int32_t code = 0;
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
int32_t szBlk = 0;
code = saveSourceBlock(pSortInfo, pSrcBlock, &szBlk);
fillSortInputBlock(pInfo, pSrcBlock, pSortInputBlk);
++pSortInfo->blkId;
pSortInfo->dataFileOffset = ((pSortInfo->dataFileOffset + szBlk) + 4096) & ~4096;
return code;
}
static void deleteBlockInfoCache(const void *key, size_t keyLen, void *value, void *ud) {
taosMemoryFree(value);
}
static void deleteBlockDataCache(const void *key, size_t keyLen, void *value, void *ud) {
SSDataBlock* pBlock = value;
blockDataDestroy(pBlock);
}
static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId, SSDataBlock** ppBlock) {
STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
LRUHandle* hBlk = taosLRUCacheLookup(pSortInfo->pBlkDataCache, &blockId, sizeof(blockId));
if (hBlk) {
SSDataBlock* pBlk = taosLRUCacheValue(pSortInfo->pBlkDataCache, hBlk);
*ppBlock = pBlk;
} else {
STmsSortBlockInfo* blkInfo = NULL;
LRUHandle* hBlkInfo = taosLRUCacheLookup(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId));
if (hBlkInfo) {
blkInfo = taosLRUCacheValue(pSortInfo->pBlkInfoCache, hBlkInfo);
} else {
blkInfo = taosMemoryMalloc(sizeof(STmsSortBlockInfo));
taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET);
taosReadFile(pSortInfo->idxFile, &blkInfo, sizeof(blkInfo));
ASSERT(blkInfo->blkId == blockId);
taosLRUCacheInsert(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId), blkInfo, 1, deleteBlockInfoCache,
&hBlkInfo, TAOS_LRU_PRIORITY_LOW, NULL);
}
{
taosLSeekFile(pSortInfo->dataFile, blkInfo->offset, SEEK_SET);
char* buf = taosMemoryMalloc(blkInfo->length);
taosReadFile(pSortInfo->dataFile, buf, blkInfo->length);
SSDataBlock* pBlock = createOneDataBlock(pInfo->pReaderBlock, false);
blockDataFromBuf(pBlock, buf);
*ppBlock = pBlock;
taosLRUCacheInsert(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId), pBlock, 1, deleteBlockDataCache,
&hBlk, TAOS_LRU_PRIORITY_LOW, NULL);
}
}
return 0;
}
void appendOneRowIdRowToDataBlock(STableMergeScanInfo* pInfo, SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
int32_t blkId = *(int32_t*)tsortGetValue(pTupleHandle, 1);
int32_t rowIdx = *(int32_t*)tsortGetValue(pTupleHandle, 2);
SSDataBlock* pSrcBlk = NULL;
retrieveSourceBlock(pInfo, blkId, &pSrcBlk);
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pSrcColInfo = taosArrayGet(pSrcBlk->pDataBlock, i);
bool isNull = colDataIsNull_s(pSrcColInfo, rowIdx);
if (isNull) {
colDataSetNULL(pColInfo, pBlock->info.rows);
} else {
char* pData = colDataGetData(pSrcColInfo, i);
if (pData != NULL) {
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
}
}
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
pBlock->info.rows += 1;
}
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
@ -3308,11 +3443,17 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->mergeLimit != -1) {
tableMergeScanDoSkipTable(pInfo, pBlock);
}
SSDataBlock* pSortInputBlk = NULL;
if (pInfo->bSortRowId) {
pSortInputBlk = createOneDataBlock(pInfo->pSortInputBlock, false);
transformIntoSortInputBlock(pInfo, pBlock, pSortInputBlk);
} else {
pSortInputBlk = pBlock;
}
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
return pSortInputBlk;
}
return NULL;
@ -3353,6 +3494,33 @@ void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo*
return;
}
int32_t startRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo;
pSort->blkId = 0;
pSort->dataFileOffset = 0;
taosGetTmpfilePath(tsTempDir, "tms-block-info", pSort->idxPath);
pSort->idxFile = taosOpenFile(pSort->idxPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
taosGetTmpfilePath(tsTempDir, "tms-block-data", pSort->dataPath);
pSort->dataFile = taosOpenFile(pSort->dataPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
pSort->pBlkInfoCache = taosLRUCacheInit(2048, 0, 0.5);
taosLRUCacheSetStrictCapacity(pSort->pBlkInfoCache, false);
pSort->pBlkDataCache = taosLRUCacheInit(2048, 0, 0.5);
taosLRUCacheSetStrictCapacity(pSort->pBlkInfoCache, false);
return 0;
}
int32_t stopRowIdSort(STableMergeScanInfo *pInfo) {
STmsSortRowIdInfo* pSort = &pInfo->tmsSortRowIdInfo;
taosCloseFile(&pSort->idxFile);
taosRemoveFile(pSort->idxPath);
taosCloseFile(&pSort->dataFile);
taosRemoveFile(pSort->dataPath);
taosLRUCacheCleanup(pSort->pBlkInfoCache);
taosLRUCacheCleanup(pSort->pBlkDataCache);
return 0;
}
int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3361,6 +3529,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->bNewFileset = false;
startRowIdSort(pInfo);
pInfo->sortBufSize = 2048 * pInfo->bufPageSize;
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
@ -3399,6 +3568,8 @@ void stopDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortDestroySortHandle(pInfo->pSortHandle);
pInfo->pSortHandle = NULL;
stopRowIdSort(pInfo);
}
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
@ -3475,8 +3646,11 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
if (pTupleHandle == NULL) {
break;
}
if (!pInfo->bSortRowId) {
appendOneRowToDataBlock(pResBlock, pTupleHandle);
} else {
appendOneRowIdRowToDataBlock(pInfo, pResBlock, pTupleHandle);
}
if (pResBlock->info.rows >= capacity) {
break;
}
@ -3659,9 +3833,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
if (!pInfo->bSortRowId) {
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
} else {
SSDataBlock* pSortInput = createDataBlock();
SColumnInfoData tsCol = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1);
blockDataAppendColInfo(pSortInput, &tsCol);
SColumnInfoData blkIdCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 2);
blockDataAppendColInfo(pSortInput, &blkIdCol);
SColumnInfoData rowIdxCol = createColumnInfoData(TSDB_DATA_TYPE_INT, 4, 3);
blockDataAppendColInfo(pSortInput, &rowIdxCol);
pInfo->pSortInputBlock = pSortInput;
SArray* pList = taosArrayInit(1, sizeof(SBlockOrderInfo));
SBlockOrderInfo bi = {0};
bi.order = pInfo->base.cond.order;
bi.slotId = 0;
bi.nullFirst = NULL_ORDER_FIRST;
taosArrayPush(pList, &bi);
pInfo->pSortInfo = pList;
}
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
@ -3678,6 +3870,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;