enh(tsdb): on-demand open stt file reader.
This commit is contained in:
parent
53b2158c54
commit
675c9ca613
|
@ -565,9 +565,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
w.skey = pScanInfo->lastKey + step;
|
||||
w.skey = pScanInfo->lastProcKey + step;
|
||||
} else {
|
||||
w.ekey = pScanInfo->lastKey + step;
|
||||
w.ekey = pScanInfo->lastProcKey + step;
|
||||
}
|
||||
|
||||
if (isEmptyQueryTimeWindow(&w)) {
|
||||
|
@ -607,14 +607,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
|||
|
||||
clearBrinBlockIter(&iter);
|
||||
|
||||
pBlockNum->numOfLastFiles = pReader->status.pCurrentFileset->lvlArr->size;
|
||||
int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks;
|
||||
pBlockNum->numOfSttFiles = pReader->status.pCurrentFileset->lvlArr->size;
|
||||
int32_t total = pBlockNum->numOfSttFiles + pBlockNum->numOfBlocks;
|
||||
|
||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug(
|
||||
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
|
||||
"time:%.2f ms %s",
|
||||
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles,
|
||||
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfSttFiles,
|
||||
sizeInDisk / 1000.0, el, pReader->idStr);
|
||||
|
||||
pReader->cost.numOfBlocks += total;
|
||||
|
@ -1200,13 +1200,12 @@ static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* p
|
|||
}
|
||||
}
|
||||
|
||||
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader* pLastBlockReader, int32_t order) {
|
||||
static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo, int32_t order) {
|
||||
bool ascScan = ASCENDING_TRAVERSE(order);
|
||||
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
|
||||
|
||||
int64_t key = 0;
|
||||
if (bHasDataInLastBlock) {
|
||||
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
key = ascScan ? TMIN(pBlock->record.firstKey, keyInStt) : TMAX(pBlock->record.lastKey, keyInStt);
|
||||
} else {
|
||||
key = ascScan ? pBlock->record.firstKey : pBlock->record.lastKey;
|
||||
|
@ -1215,10 +1214,10 @@ static int64_t getBoarderKeyInFiles(SFileDataBlockInfo* pBlock, SLastBlockReader
|
|||
return key;
|
||||
}
|
||||
|
||||
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock,
|
||||
SLastBlockReader* pLastBlockReader, int32_t order) {
|
||||
static bool bufferDataInFileBlockGap(TSDBKEY keyInBuf, SFileDataBlockInfo* pBlock, STableBlockScanInfo* pScanInfo,
|
||||
int32_t order) {
|
||||
bool ascScan = ASCENDING_TRAVERSE(order);
|
||||
int64_t key = getBoarderKeyInFiles(pBlock, pLastBlockReader, order);
|
||||
int64_t key = getBoarderKeyInFiles(pBlock, pScanInfo, order);
|
||||
|
||||
return (ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts < key)) ||
|
||||
(!ascScan && (keyInBuf.ts != TSKEY_INITIAL_VAL && keyInBuf.ts > key));
|
||||
|
@ -1302,10 +1301,9 @@ typedef struct {
|
|||
} SDataBlockToLoadInfo;
|
||||
|
||||
static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo,
|
||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader,
|
||||
STsdbReader* pReader) {
|
||||
int32_t neighborIndex = 0;
|
||||
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, STsdbReader* pReader) {
|
||||
SBrinRecord rec = {0};
|
||||
int32_t neighborIndex = 0;
|
||||
|
||||
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
|
||||
pReader->info.order, &rec);
|
||||
|
@ -1319,9 +1317,11 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count) || (pBlockInfo->record.count <= 0);
|
||||
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order);
|
||||
|
||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
pInfo->overlapWithLastBlock = !(pBlockInfo->record.lastKey < tsLast || pBlockInfo->record.firstKey > tsLast);
|
||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA) {
|
||||
int64_t nextProcKeyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
pInfo->overlapWithLastBlock =
|
||||
!(pBlockInfo->record.lastKey < nextProcKeyInStt || pBlockInfo->record.firstKey > nextProcKeyInStt);
|
||||
}
|
||||
|
||||
pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity;
|
||||
|
@ -1336,9 +1336,9 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
|
|||
// 5. delete info should not overlap with current block data
|
||||
// 6. current block should not contain the duplicated ts
|
||||
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
|
||||
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
|
||||
TSDBKEY keyInBuf) {
|
||||
SDataBlockToLoadInfo info = {0};
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
|
||||
|
||||
bool loadDataBlock =
|
||||
(info.overlapWithNeighborBlock || info.hasDupTs || info.partiallyRequired || info.overlapWithKeyInBuf ||
|
||||
|
@ -1358,9 +1358,9 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
|
|||
}
|
||||
|
||||
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
|
||||
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
|
||||
TSDBKEY keyInBuf) {
|
||||
SDataBlockToLoadInfo info = {0};
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
|
||||
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pReader);
|
||||
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
|
||||
info.overlapWithDelInfo || info.overlapWithLastBlock);
|
||||
return isCleanFileBlock;
|
||||
|
@ -1417,14 +1417,15 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
|
||||
SVersionRange* pVerRange) {
|
||||
static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
|
||||
SVersionRange* pVerRange) {
|
||||
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
|
||||
|
||||
while (1) {
|
||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||
if (!hasVal) { // the next value will be the accessed key in stt
|
||||
pScanInfo->lastKeyInStt += step;
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||
pScanInfo->sttKeyInfo.nextProcKey += step;
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1433,10 +1434,11 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
|||
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
|
||||
|
||||
pLastBlockReader->currentKey = key;
|
||||
pScanInfo->lastKeyInStt = key;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||
|
||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
|
||||
pVerRange)) {
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1457,7 +1459,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
|
|||
|
||||
// avoid the fetch next row replace the referenced stt block in buffer
|
||||
doPinSttBlock(pLastBlockReader);
|
||||
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
doUnpinSttBlock(pLastBlockReader);
|
||||
if (hasVal) {
|
||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
|
@ -1694,7 +1696,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
|||
}
|
||||
|
||||
if (copied) {
|
||||
pBlockScanInfo->lastKey = tsLastBlock;
|
||||
pBlockScanInfo->lastProcKey = tsLastBlock;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
|
||||
|
@ -2062,9 +2064,9 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
|
|||
STbData* d = NULL;
|
||||
TSDBKEY startKey = {0};
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer};
|
||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey + 1, .version = pReader->info.verRange.minVer};
|
||||
} else {
|
||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer};
|
||||
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastProcKey - 1, .version = pReader->info.verRange.maxVer};
|
||||
}
|
||||
|
||||
int32_t code =
|
||||
|
@ -2129,9 +2131,9 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
|
||||
STimeWindow w = pLBlockReader->window;
|
||||
if (ASCENDING_TRAVERSE(pLBlockReader->order)) {
|
||||
w.skey = pScanInfo->lastKeyInStt;
|
||||
w.skey = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
} else {
|
||||
w.ekey = pScanInfo->lastKeyInStt;
|
||||
w.ekey = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
@ -2164,7 +2166,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
|||
initMemDataIterator(pScanInfo, pReader);
|
||||
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
|
||||
|
||||
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
|
||||
|
||||
int64_t el = taosGetTimestampUs() - st;
|
||||
pReader->cost.initLastBlockReader += (el / 1000.0);
|
||||
|
@ -2209,7 +2211,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
|||
}
|
||||
|
||||
if (copied) {
|
||||
pBlockScanInfo->lastKey = key;
|
||||
pBlockScanInfo->lastProcKey = key;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
@ -2354,7 +2356,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
|
||||
|
||||
// it is a clean block, load it directly
|
||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
|
||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) &&
|
||||
(pRecord->numRow <= pReader->resBlockInfo.capacity)) {
|
||||
if (asc || (!hasDataInLastBlock(pLastBlockReader))) {
|
||||
code = copyBlockDataToSDataBlock(pReader);
|
||||
|
@ -2363,7 +2365,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
// record the last key value
|
||||
pBlockScanInfo->lastKey = asc ? pRecord->lastKey : pRecord->firstKey;
|
||||
pBlockScanInfo->lastProcKey = asc ? pRecord->lastKey : pRecord->firstKey;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
@ -2527,7 +2529,7 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
|
|||
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
pBlockNum->numOfBlocks = 0;
|
||||
pBlockNum->numOfLastFiles = 0;
|
||||
pBlockNum->numOfSttFiles = 0;
|
||||
|
||||
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk));
|
||||
|
@ -2564,7 +2566,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
|
|||
return code;
|
||||
}
|
||||
|
||||
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
|
||||
if (pBlockNum->numOfBlocks + pBlockNum->numOfSttFiles > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2684,11 +2686,13 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, SLastBlockReader* pLastBlockReader, bool asc) {
|
||||
if(!hasDataInLastBlock(pLastBlockReader)) {
|
||||
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||
|
||||
if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
|
||||
return true;
|
||||
} else {
|
||||
int64_t keyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||
return (asc && pBlockInfo->record.lastKey < keyInStt) || (!asc && pBlockInfo->record.firstKey > keyInStt);
|
||||
}
|
||||
}
|
||||
|
@ -2717,10 +2721,12 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||
}
|
||||
|
||||
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader)) {
|
||||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||
if (fileBlockShouldLoad(pReader, pBlockInfo, pScanInfo, keyInBuf)) {
|
||||
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2728,13 +2734,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// build composed data block
|
||||
code = buildComposedDataBlock(pReader);
|
||||
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pLastBlockReader, pReader->info.order)) {
|
||||
} else if (bufferDataInFileBlockGap(keyInBuf, pBlockInfo, pScanInfo, pReader->info.order)) {
|
||||
// data in memory that are earlier than current file block and stt blocks
|
||||
// rows in buffer should be less than the file block in asc, greater than file block in desc
|
||||
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pLastBlockReader, pReader->info.order);
|
||||
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
|
||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||
} else {
|
||||
if (notOverlapWithSttFiles(pBlockInfo, pLastBlockReader, asc)) {
|
||||
if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) {
|
||||
// whole block is required, return it directly
|
||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||
pInfo->rows = pBlockInfo->record.numRow;
|
||||
|
@ -2745,7 +2751,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
|
||||
|
||||
// update the last key for the corresponding table
|
||||
pScanInfo->lastKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||
tsdbDebug("%p uid:%" PRIu64
|
||||
" clean file block retrieved from file, global index:%d, "
|
||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||
|
@ -2760,8 +2766,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
|||
tsdbDebug("load data in last block firstly %s", pReader->idStr);
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
// let's load data from stt files
|
||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||
|
||||
// no data in last block, no need to proceed.
|
||||
while (hasDataInLastBlock(pLastBlockReader)) {
|
||||
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||
|
||||
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -2988,7 +2999,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
|
|||
if (pBlockInfo) {
|
||||
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||
if (pScanInfo) {
|
||||
lastKey = pScanInfo->lastKey;
|
||||
lastKey = pScanInfo->lastProcKey;
|
||||
}
|
||||
|
||||
pDumpInfo->totalRows = pBlockInfo->record.numRow;
|
||||
|
@ -3013,7 +3024,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
|
|||
}
|
||||
|
||||
// all data files are consumed, try data in buffer
|
||||
if (num.numOfBlocks + num.numOfLastFiles == 0) {
|
||||
if (num.numOfBlocks + num.numOfSttFiles == 0) {
|
||||
pReader->status.loadFromFile = false;
|
||||
taosArrayDestroy(pTableList);
|
||||
return code;
|
||||
|
@ -3458,15 +3469,15 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
|
||||
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
||||
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
|
||||
while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
|
||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||
if (next1 == ts) {
|
||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||
} else {
|
||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
|
||||
idStr);
|
||||
pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline),
|
||||
pScanInfo->sttKeyInfo.nextProcKey, idStr);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -3722,7 +3733,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
|||
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.rows += 1;
|
||||
pScanInfo->lastKey = pTSRow->ts;
|
||||
pScanInfo->lastProcKey = pTSRow->ts;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3856,14 +3867,15 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
|
|||
// todo extract method
|
||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||
int64_t skey = pReader->info.window.skey;
|
||||
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pInfo->lastKeyInStt = skey;
|
||||
pInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pInfo->sttKeyInfo.nextProcKey = skey;
|
||||
} else {
|
||||
int64_t ekey = pReader->info.window.ekey;
|
||||
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pInfo->lastKeyInStt = ekey;
|
||||
pInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pInfo->sttKeyInfo.nextProcKey = ekey;
|
||||
}
|
||||
|
||||
pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
|
||||
}
|
||||
|
||||
|
@ -4224,7 +4236,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
if (pBlockScanInfo) {
|
||||
// save lastKey to restore memory iterator
|
||||
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
|
||||
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
|
||||
pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
|
||||
|
||||
// reset current current table's data block scan info,
|
||||
pBlockScanInfo->iterInit = false;
|
||||
|
|
|
@ -157,17 +157,18 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf
|
|||
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
|
||||
int64_t skey = pTsdbReader->info.window.skey;
|
||||
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pScanInfo->lastKeyInStt = skey;
|
||||
pScanInfo->lastProcKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = skey;
|
||||
} else {
|
||||
int64_t ekey = pTsdbReader->info.window.ekey;
|
||||
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pScanInfo->lastKeyInStt = ekey;
|
||||
pScanInfo->lastProcKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pScanInfo->sttKeyInfo.nextProcKey = ekey;
|
||||
}
|
||||
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
|
||||
pScanInfo->lastKey, pTsdbReader->idStr);
|
||||
pScanInfo->lastProcKey, pTsdbReader->idStr);
|
||||
}
|
||||
|
||||
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
@ -200,8 +201,8 @@ void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
|
|||
}
|
||||
|
||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||
pInfo->lastKey = ts;
|
||||
pInfo->lastKeyInStt = ts + step;
|
||||
pInfo->lastProcKey = ts;
|
||||
pInfo->sttKeyInfo.nextProcKey = ts + step;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -241,6 +242,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
|||
taosArrayClear(pScanInfo->pBlockList);
|
||||
taosArrayClear(pScanInfo->pBlockIdxList);
|
||||
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||
}
|
||||
|
||||
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) {
|
||||
|
|
|
@ -63,20 +63,31 @@ typedef struct STableDataBlockIdx {
|
|||
int32_t globalIndex;
|
||||
} STableDataBlockIdx;
|
||||
|
||||
typedef enum ESttKeyStatus {
|
||||
STT_FILE_READER_UNINIT = 0x0,
|
||||
STT_FILE_NO_DATA = 0x1,
|
||||
STT_FILE_HAS_DATA = 0x2,
|
||||
} ESttKeyStatus;
|
||||
|
||||
typedef struct SSttKeyInfo {
|
||||
ESttKeyStatus status; // this value should be updated when switch to the next fileset
|
||||
int64_t nextProcKey;
|
||||
} SSttKeyInfo;
|
||||
|
||||
typedef struct STableBlockScanInfo {
|
||||
uint64_t uid;
|
||||
TSKEY lastKey;
|
||||
TSKEY lastKeyInStt; // last accessed key in stt
|
||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
|
||||
SArray* pMemDelData; // SArray<SDelData>
|
||||
SArray* pFileDelData; // SArray<SDelData> from each file set
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
SArray* delSkyline; // delete info for this table
|
||||
int32_t fileDelIndex; // file block delete index
|
||||
int32_t sttBlockDelIndex; // delete index for last block
|
||||
bool iterInit; // whether to initialize the in-memory skip list iterator or not
|
||||
uint64_t uid;
|
||||
TSKEY lastProcKey;
|
||||
SSttKeyInfo sttKeyInfo;
|
||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||
SArray* pBlockIdxList; // SArray<STableDataBlockIndx>
|
||||
SArray* pMemDelData; // SArray<SDelData>
|
||||
SArray* pFileDelData; // SArray<SDelData> from each file set
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
SArray* delSkyline; // delete info for this table
|
||||
int32_t fileDelIndex; // file block delete index
|
||||
int32_t sttBlockDelIndex; // delete index for last block
|
||||
bool iterInit; // whether to initialize the in-memory skip list iterator or not
|
||||
} STableBlockScanInfo;
|
||||
|
||||
typedef struct SResultBlockInfo {
|
||||
|
@ -108,7 +119,7 @@ typedef struct STableUidList {
|
|||
|
||||
typedef struct {
|
||||
int32_t numOfBlocks;
|
||||
int32_t numOfLastFiles;
|
||||
int32_t numOfSttFiles;
|
||||
} SBlockNumber;
|
||||
|
||||
typedef struct SBlockIndex {
|
||||
|
|
Loading…
Reference in New Issue