fix: save work

This commit is contained in:
slzhou 2023-12-08 17:45:54 +08:00
parent 854766d986
commit 83139b8d48
1 changed files with 14 additions and 5 deletions

View File

@ -3251,7 +3251,8 @@ static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo,
const STmsSortRowIdInfo* pSortInfo = &pInfo->tmsSortRowIdInfo;
int32_t nRows = pSrcBlock->info.rows;
pSortInputBlk->info = pSrcBlock->info;
pSortInputBlk->info.window = pSrcBlock->info.window;
pSortInputBlk->info.id = pSrcBlock->info.id;
blockDataEnsureCapacity(pSortInputBlk, nRows);
int32_t tsSlotId = ((SBlockOrderInfo*)taosArrayGet(pInfo->pSortInfo, 0))->slotId;
@ -3266,6 +3267,8 @@ static int32_t fillSortInputBlock(const STableMergeScanInfo* pInfo,
for (int32_t i = 0; i < nRows; ++i) {
colDataSetInt32(rowIdxCol, i, &i);
}
pSortInputBlk->info.rows = nRows;
return 0;
}
@ -3305,15 +3308,17 @@ static int32_t retrieveSourceBlock(STableMergeScanInfo* pInfo, int32_t blockId,
LRUHandle* hBlkInfo = taosLRUCacheLookup(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId));
if (hBlkInfo) {
blkInfo = taosLRUCacheValue(pSortInfo->pBlkInfoCache, hBlkInfo);
uInfo("found block info: %d for %d, offset: %"PRId64", length: %d", blkInfo->blkId, blockId, blkInfo->offset, blkInfo->length)
} else {
blkInfo = taosMemoryMalloc(sizeof(STmsSortBlockInfo));
taosLSeekFile(pSortInfo->idxFile, blockId * sizeof(STmsSortBlockInfo), SEEK_SET);
taosReadFile(pSortInfo->idxFile, &blkInfo, sizeof(blkInfo));
taosReadFile(pSortInfo->idxFile, blkInfo, sizeof(STmsSortBlockInfo));
ASSERT(blkInfo->blkId == blockId);
taosLRUCacheInsert(pSortInfo->pBlkInfoCache, &blockId, sizeof(blockId), blkInfo, 1, deleteBlockInfoCache,
&hBlkInfo, TAOS_LRU_PRIORITY_LOW, NULL);
}
{
uInfo("retrieve block info: %d, offset: %"PRId64", length: %d", blkInfo->blkId, blkInfo->offset, blkInfo->length)
taosLSeekFile(pSortInfo->dataFile, blkInfo->offset, SEEK_SET);
char* buf = taosMemoryMalloc(blkInfo->length);
taosReadFile(pSortInfo->dataFile, buf, blkInfo->length);
@ -3443,14 +3448,17 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
if (pInfo->mergeLimit != -1) {
tableMergeScanDoSkipTable(pInfo, pBlock);
}
SSDataBlock* pSortInputBlk = NULL;
pOperator->resultInfo.totalRows += pBlock->info.rows;
SSDataBlock* pSortInputBlk = pInfo->pSortInputBlock;
if (pInfo->bSortRowId) {
pSortInputBlk = createOneDataBlock(pInfo->pSortInputBlock, false);
blockDataCleanup(pSortInputBlk);
transformIntoSortInputBlock(pInfo, pBlock, pSortInputBlk);
} else {
pSortInputBlk = pBlock;
}
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pSortInputBlk;
@ -3833,6 +3841,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
pInfo->bSortRowId = true;
if (!pInfo->bSortRowId) {
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);