fix(tsdb): check and return if the rows in stt are before the data rows in data files.
This commit is contained in:
parent
3236ef7bae
commit
6736fd1615
|
@ -2628,6 +2628,58 @@ static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) {
|
||||||
return (pStatus->pProcMemTableIter != NULL);
|
return (pStatus->pProcMemTableIter != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) {
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||||
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
|
SDataBlockInfo* pInfo = &pResBlock->info;
|
||||||
|
blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt);
|
||||||
|
|
||||||
|
pInfo->rows = pScanInfo->numOfRowsInStt;
|
||||||
|
pInfo->id.uid = pScanInfo->uid;
|
||||||
|
pInfo->dataLoad = 1;
|
||||||
|
pInfo->window = pScanInfo->sttWindow;
|
||||||
|
|
||||||
|
setComposedBlockFlag(pReader, true);
|
||||||
|
|
||||||
|
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
||||||
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
|
pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
||||||
|
pScanInfo->sttBlockReturned = true;
|
||||||
|
|
||||||
|
pSttBlockReader->mergeTree.pIter = NULL;
|
||||||
|
|
||||||
|
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
|
||||||
|
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
|
pResBlock->info.rows, pReader->idStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo,
|
||||||
|
SFileDataBlockInfo* pBlockInfo, int32_t blockIndex) {
|
||||||
|
// whole block is required, return it directly
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
|
pInfo->rows = pBlockInfo->numRow;
|
||||||
|
pInfo->id.uid = pScanInfo->uid;
|
||||||
|
pInfo->dataLoad = 0;
|
||||||
|
pInfo->version = pReader->info.verRange.maxVer;
|
||||||
|
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
||||||
|
setComposedBlockFlag(pReader, false);
|
||||||
|
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
||||||
|
|
||||||
|
// update the last key for the corresponding table
|
||||||
|
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
||||||
|
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
|
||||||
|
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||||
|
pReader, pScanInfo->uid, blockIndex, pBlockInfo->tbBlockIdx, pBlockInfo->numRow, pBlockInfo->firstKey,
|
||||||
|
pBlockInfo->lastKey, pReader->idStr);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||||
|
@ -2680,28 +2732,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
// if only require the total rows, no need to load data from stt file if it is clean stt blocks
|
// if only require the total rows, no need to load data from stt file if it is clean stt blocks
|
||||||
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
|
if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) {
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
buildCleanBlockFromSttFiles(pReader, pScanInfo);
|
||||||
|
|
||||||
SDataBlockInfo* pInfo = &pResBlock->info;
|
|
||||||
blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt);
|
|
||||||
|
|
||||||
pInfo->rows = pScanInfo->numOfRowsInStt;
|
|
||||||
pInfo->id.uid = pScanInfo->uid;
|
|
||||||
pInfo->dataLoad = 1;
|
|
||||||
pInfo->window = pScanInfo->sttWindow;
|
|
||||||
|
|
||||||
setComposedBlockFlag(pReader, true);
|
|
||||||
|
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = asc ? pScanInfo->sttWindow.ekey + 1 : pScanInfo->sttWindow.skey - 1;
|
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
|
||||||
pScanInfo->lastProcKey = asc ? pScanInfo->sttWindow.ekey : pScanInfo->sttWindow.skey;
|
|
||||||
pScanInfo->sttBlockReturned = true;
|
|
||||||
|
|
||||||
pSttBlockReader->mergeTree.pIter = NULL;
|
|
||||||
|
|
||||||
tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s",
|
|
||||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
|
||||||
pResBlock->info.rows, pReader->idStr);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2741,10 +2772,11 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
// current active data block not overlap with the stt-files/stt-blocks
|
||||||
|
static bool notOverlapWithFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
|
||||||
|
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
|
if ((!hasDataInSttBlock(pScanInfo)) || (pScanInfo->cleanSttBlocks == true)) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
|
@ -2794,24 +2826,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
|
int64_t endKey = getBoarderKeyInFiles(pBlockInfo, pScanInfo, pReader->info.order);
|
||||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||||
} else {
|
} else {
|
||||||
if (notOverlapWithSttFiles(pBlockInfo, pScanInfo, asc)) {
|
if (notOverlapWithFiles(pBlockInfo, pScanInfo, asc)) {
|
||||||
// whole block is required, return it directly
|
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
|
||||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
|
||||||
pInfo->rows = pBlockInfo->numRow;
|
|
||||||
pInfo->id.uid = pScanInfo->uid;
|
|
||||||
pInfo->dataLoad = 0;
|
|
||||||
pInfo->version = pReader->info.verRange.maxVer;
|
|
||||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->firstKey, .ekey = pBlockInfo->lastKey};
|
|
||||||
setComposedBlockFlag(pReader, false);
|
|
||||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order);
|
|
||||||
|
|
||||||
// update the last key for the corresponding table
|
if ((!hasDataInSttBlock(pScanInfo)) || (asc && pBlockInfo->lastKey < keyInStt) ||
|
||||||
pScanInfo->lastProcKey = asc ? pInfo->window.ekey : pInfo->window.skey;
|
(!asc && pBlockInfo->firstKey > keyInStt)) {
|
||||||
tsdbDebug("%p uid:%" PRIu64
|
if (pScanInfo->cleanSttBlocks && hasDataInSttBlock(pScanInfo)) {
|
||||||
" clean file block retrieved from file, global index:%d, "
|
if (asc) { // file block is located before the stt block
|
||||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey);
|
||||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->numRow,
|
} else { // stt block is before the file block
|
||||||
pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buildCleanBlockFromDataFiles(pReader, pScanInfo, pBlockInfo, pBlockIter->index);
|
||||||
|
} else { // clean stt block
|
||||||
|
if (asc) {
|
||||||
|
ASSERT(pScanInfo->sttWindow.ekey < pBlockInfo->firstKey);
|
||||||
|
} else {
|
||||||
|
ASSERT(pScanInfo->sttWindow.skey > pBlockInfo->lastKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
// return the stt file block
|
||||||
|
ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL);
|
||||||
|
buildCleanBlockFromSttFiles(pReader, pScanInfo);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
SBlockData* pBData = &pReader->status.fileBlockData;
|
SBlockData* pBData = &pReader->status.fileBlockData;
|
||||||
tBlockDataReset(pBData);
|
tBlockDataReset(pBData);
|
||||||
|
@ -2822,7 +2862,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
// let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files
|
||||||
pScanInfo->cleanSttBlocks = false;
|
|
||||||
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
|
||||||
// no data in stt block, no need to proceed.
|
// no data in stt block, no need to proceed.
|
||||||
|
@ -2840,8 +2879,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
||||||
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) ||
|
if ((lastKeyInStt >= pBlockInfo->firstKey && asc) || (lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
||||||
(lastKeyInStt <= pBlockInfo->lastKey && (!asc))) {
|
|
||||||
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
||||||
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
lastKeyInStt, pBlockInfo->firstKey, pBlockInfo->lastKey, pReader->idStr);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue