fix(query): get if the correct rows in last block exists, and set the correct flag to denote if the last blocks index have been loaded or not.

This commit is contained in:
Haojun Liao 2022-11-02 22:27:23 +08:00
parent ecad94c419
commit fc20be8699
4 changed files with 19 additions and 15 deletions

View File

@ -643,6 +643,7 @@ typedef struct SSttBlockLoadInfo {
STSchema *pSchema;
int16_t *colIds;
int32_t numOfCols;
bool sttBlockLoaded;
} SSttBlockLoadInfo;
typedef struct SMergeTree {

View File

@ -72,6 +72,7 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
pLoadInfo[i].elapsedTime = 0;
pLoadInfo[i].loadBlocks = 0;
pLoadInfo[i].sttBlockLoaded = false;
}
}
@ -278,9 +279,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size == 0) {
// size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
pBlockLoadInfo->sttBlockLoaded = true;
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
@ -288,7 +290,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
}
// only apply to the child tables, ordinary tables will not incur this filter procedure.
size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
@ -296,10 +298,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid == suid) {
// do nothing
} else if (pStart->suid != suid) {
if (pStart->suid != suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
(*pIter)->iSttBlk = -1;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
@ -330,7 +332,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
}
size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block
(*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);

View File

@ -2112,8 +2112,8 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table.
if (pLBlockReader->uid == pScanInfo->uid && hasDataInLastBlock(pLBlockReader)) {
return true;
if (pLBlockReader->uid == pScanInfo->uid) {
return hasDataInLastBlock(pLBlockReader);
}
if (pLBlockReader->uid != 0) {
@ -2246,6 +2246,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (pReader->order == TSDB_ORDER_ASC ||
(pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
// record the last key value
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pBlock->maxKey.ts:pBlock->minKey.ts;
goto _end;
}
}
@ -2637,12 +2640,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pInfo->window = (STimeWindow){.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlock->maxKey.ts, pReader->order);
}
}
if (code == TSDB_CODE_SUCCESS) {
STimeWindow* pWin = &pReader->pResBlock->info.window;
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pWin->ekey:pWin->skey;
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order)? pInfo->window.ekey:pInfo->window.skey;
}
}
return code;

View File

@ -25,7 +25,7 @@ from util.sqlset import TDSetSql
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
self.dbname = 'db_test'
self.setsql = TDSetSql()
self.stbname = 'stb'