Merge pull request #16263 from taosdata/feature/3_liaohj
refactor(query): optimize load last block.
This commit is contained in:
commit
99b4987791
|
@ -94,7 +94,7 @@ typedef struct SLastBlockReader {
|
|||
SVersionRange verRange;
|
||||
int32_t order;
|
||||
uint64_t uid;
|
||||
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
|
||||
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
|
||||
} SLastBlockReader;
|
||||
|
||||
typedef struct SFilesetIter {
|
||||
|
@ -339,6 +339,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
|
|||
pLReader->order = pReader->order;
|
||||
pLReader->window = pReader->window;
|
||||
pLReader->verRange = pReader->verRange;
|
||||
pLReader->currentBlockIndex = -1;
|
||||
|
||||
int32_t code = tBlockDataCreate(&pLReader->lastBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2326,15 +2327,17 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* p
|
|||
|
||||
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
pBlockNum->numOfBlocks = 0;
|
||||
pBlockNum->numOfLastBlocks = 0;
|
||||
|
||||
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
|
||||
SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
|
||||
taosArrayClear(pLastBlocks);
|
||||
|
||||
while (1) {
|
||||
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader);
|
||||
if (!hasNext) { // no data files on disk
|
||||
taosArrayClear(pLastBlocks);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -2380,29 +2383,38 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo add elapsed time results
|
||||
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo *pBlockScanInfo, STsdbReader* pReader) {
|
||||
SArray* pBlocks = pLastBlockReader->pBlockL;
|
||||
SBlockL* pBlock = NULL;
|
||||
|
||||
uint64_t uid = pBlockScanInfo->uid;
|
||||
initMemDataIterator(pBlockScanInfo, pReader);
|
||||
pLastBlockReader->currentBlockIndex = -1;
|
||||
int32_t totalLastBlocks = (int32_t)taosArrayGetSize(pBlocks);
|
||||
|
||||
// find the correct SBlockL
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
|
||||
initMemDataIterator(pBlockScanInfo, pReader);
|
||||
|
||||
// find the correct SBlockL. todo binary search
|
||||
int32_t index = -1;
|
||||
for (int32_t i = 0; i < totalLastBlocks; ++i) {
|
||||
SBlockL* p = taosArrayGet(pBlocks, i);
|
||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||
pLastBlockReader->currentBlockIndex = i;
|
||||
index = i;
|
||||
pBlock = p;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pLastBlockReader->currentBlockIndex == -1) {
|
||||
if (index == -1) {
|
||||
pLastBlockReader->currentBlockIndex = index;
|
||||
tBlockDataReset(&pLastBlockReader->lastBlockData);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// the required last datablock has already loaded
|
||||
if (index == pLastBlockReader->currentBlockIndex) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p init block data failed, code:%s %s", pReader, tstrerror(code), pReader->idStr);
|
||||
|
@ -2411,13 +2423,16 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable
|
|||
|
||||
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError(
|
||||
"%p error occurs in loading last block into buffer, last block index:%d, total:%d rows:%d, minVer:%" PRId64
|
||||
", maxVer:%" PRId64 ", code:%s %s",
|
||||
pReader, pLastBlockReader->currentBlockIndex, (int32_t)taosArrayGetSize(pBlocks), pBlock->nRow, pBlock->minVer,
|
||||
pBlock->maxVer, tstrerror(code), pReader->idStr);
|
||||
tsdbError("%p error occurs in loading last block into buffer, last block index:%d, total:%d code:%s %s", pReader,
|
||||
pLastBlockReader->currentBlockIndex, totalLastBlocks, tstrerror(code), pReader->idStr);
|
||||
} else {
|
||||
tsdbDebug("%p load last block completed, uid:%" PRIu64
|
||||
" last block index:%d, total:%d rows:%d, minVer:%d, maxVer:%d, brange:%" PRId64 " - %" PRId64 " %s",
|
||||
pReader, uid, pLastBlockReader->currentBlockIndex, totalLastBlocks, pBlock->nRow, pBlock->minVer,
|
||||
pBlock->maxVer, pBlock->minKey, pBlock->maxKey, pReader->idStr);
|
||||
}
|
||||
|
||||
pLastBlockReader->currentBlockIndex = index;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2618,6 +2633,9 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
|
|||
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap);
|
||||
}
|
||||
|
||||
SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
|
||||
pLReader->currentBlockIndex = -1;
|
||||
|
||||
// set the correct start position according to the query time window
|
||||
initBlockDumpInfo(pReader, pBlockIter);
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue