Revert "fix(tsdb/read2): reset stt reader when suspended"
This reverts commit 079d7ff69e
.
This commit is contained in:
parent
7924d7e0af
commit
9ddab11d98
|
@ -1729,41 +1729,45 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
|
|
||||||
// row in last file block
|
// row in last file block
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
|
if (key < tsLast) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
} else if (key == ts) {
|
} else if (key > tsLast) {
|
||||||
SRow* pTSRow = NULL;
|
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
} else {
|
||||||
return code;
|
if (key > tsLast) {
|
||||||
}
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
|
} else if (key < tsLast) {
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
|
||||||
|
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr);
|
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
|
||||||
tsdbRowMergerClear(pMerger);
|
|
||||||
return code;
|
|
||||||
} else { // key > ts
|
|
||||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||||
}
|
}
|
||||||
} else { // desc order
|
|
||||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
|
|
||||||
}
|
}
|
||||||
|
// the following for key == tsLast
|
||||||
|
SRow* pTSRow = NULL;
|
||||||
|
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||||
|
|
||||||
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
|
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
|
|
||||||
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
|
|
||||||
|
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
|
taosMemoryFree(pTSRow);
|
||||||
|
tsdbRowMergerClear(pMerger);
|
||||||
|
return code;
|
||||||
|
|
||||||
} else { // only last block exists
|
} else { // only last block exists
|
||||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||||
}
|
}
|
||||||
|
@ -2190,7 +2194,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
TSDBROW *pRow = NULL, *piRow = NULL;
|
TSDBROW *pRow = NULL, *piRow = NULL;
|
||||||
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] :
|
||||||
|
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
|
||||||
if (pBlockScanInfo->iter.hasVal) {
|
if (pBlockScanInfo->iter.hasVal) {
|
||||||
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
|
||||||
}
|
}
|
||||||
|
@ -2564,9 +2569,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
// load the last data block of current table
|
// load the last data block of current table
|
||||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
|
if (pScanInfo == NULL) {
|
||||||
// reset the index in last block when handing a new file
|
tsdbError("table Iter is null, invalid pScanInfo, try next table %s", pReader->idStr);
|
||||||
// doCleanupTableScanInfo(pScanInfo);
|
|
||||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2575,8 +2579,15 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the index in last block when handing a new file
|
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
|
||||||
// doCleanupTableScanInfo(pScanInfo);
|
// reset the index in last block when handing a new file
|
||||||
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
|
if (!hasNexTable) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||||
if (!hasDataInLastFile) {
|
if (!hasDataInLastFile) {
|
||||||
|
@ -2667,16 +2678,32 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
|
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
|
||||||
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
|
||||||
} else {
|
} else {
|
||||||
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) {
|
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
|
||||||
// only return the rows in last block
|
int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
|
||||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
if (!bHasDataInLastBlock || ((ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.lastKey < tsLast) ||
|
||||||
ASSERT(tsLast >= pBlockInfo->record.lastKey);
|
(!ASCENDING_TRAVERSE(pReader->info.order) && pBlockInfo->record.firstKey > tsLast))) {
|
||||||
|
// whole block is required, return it directly
|
||||||
|
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
||||||
|
pInfo->rows = pBlockInfo->record.numRow;
|
||||||
|
pInfo->id.uid = pScanInfo->uid;
|
||||||
|
pInfo->dataLoad = 0;
|
||||||
|
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
|
||||||
|
setComposedBlockFlag(pReader, false);
|
||||||
|
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
|
||||||
|
|
||||||
|
// update the last key for the corresponding table
|
||||||
|
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
|
||||||
|
tsdbDebug("%p uid:%" PRIu64
|
||||||
|
" clean file block retrieved from file, global index:%d, "
|
||||||
|
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
||||||
|
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
|
||||||
|
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
|
||||||
|
} else {
|
||||||
SBlockData* pBData = &pReader->status.fileBlockData;
|
SBlockData* pBData = &pReader->status.fileBlockData;
|
||||||
tBlockDataReset(pBData);
|
tBlockDataReset(pBData);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
|
tsdbDebug("load data in last block firstly %s", pReader->idStr);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
@ -2707,23 +2734,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
|
||||||
pResBlock->info.rows, el, pReader->idStr);
|
pResBlock->info.rows, el, pReader->idStr);
|
||||||
}
|
}
|
||||||
} else { // whole block is required, return it directly
|
|
||||||
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
|
|
||||||
pInfo->rows = pBlockInfo->record.numRow;
|
|
||||||
pInfo->id.uid = pScanInfo->uid;
|
|
||||||
pInfo->dataLoad = 0;
|
|
||||||
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
|
|
||||||
setComposedBlockFlag(pReader, false);
|
|
||||||
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
|
|
||||||
|
|
||||||
// update the last key for the corresponding table
|
|
||||||
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
|
|
||||||
tsdbDebug("%p uid:%" PRIu64
|
|
||||||
" clean file block retrieved from file, global index:%d, "
|
|
||||||
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
|
|
||||||
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
|
|
||||||
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
|
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
|
||||||
|
@ -4097,11 +4109,6 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||||
|
|
||||||
tsdbDataFileReaderClose(&pReader->pFileReader);
|
tsdbDataFileReaderClose(&pReader->pFileReader);
|
||||||
|
|
||||||
int64_t loadBlocks = 0;
|
|
||||||
double elapse = 0;
|
|
||||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &loadBlocks, &elapse);
|
|
||||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
|
||||||
|
|
||||||
// resetDataBlockScanInfo excluding lastKey
|
// resetDataBlockScanInfo excluding lastKey
|
||||||
STableBlockScanInfo** p = NULL;
|
STableBlockScanInfo** p = NULL;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
|
@ -4172,7 +4179,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
tsdbUntakeReadSnap(pReader, pReader->pReadSnap, false);
|
||||||
pReader->pReadSnap = NULL;
|
pReader->pReadSnap = NULL;
|
||||||
pReader->flag = READER_STATUS_SUSPEND;
|
pReader->flag = READER_STATUS_SUSPEND;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue