fix(query): set the initial iterator table before check data in buffer.
This commit is contained in:
parent
30981ece7d
commit
f85cf08f87
|
@ -133,17 +133,17 @@ typedef struct SFileBlockDumpInfo {
|
||||||
bool allDumped;
|
bool allDumped;
|
||||||
} SFileBlockDumpInfo;
|
} SFileBlockDumpInfo;
|
||||||
|
|
||||||
typedef struct SUidOrderedList {
|
typedef struct STableUidList {
|
||||||
uint64_t* tableUidList; // access table uid list in uid ascending order list
|
uint64_t* tableUidList; // access table uid list in uid ascending order list
|
||||||
int32_t currentIndex; // index in table uid list
|
int32_t currentIndex; // index in table uid list
|
||||||
} SUidOrderedList;
|
} STableUidList;
|
||||||
|
|
||||||
typedef struct SReaderStatus {
|
typedef struct SReaderStatus {
|
||||||
bool loadFromFile; // check file stage
|
bool loadFromFile; // check file stage
|
||||||
bool composedDataBlock; // the returned data block is a composed block or not
|
bool composedDataBlock; // the returned data block is a composed block or not
|
||||||
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||||
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
|
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
|
||||||
SUidOrderedList uidCheckInfo; // check all table in uid order
|
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
|
||||||
SFileBlockDumpInfo fBlockDumpInfo;
|
SFileBlockDumpInfo fBlockDumpInfo;
|
||||||
SDFileSet* pCurrentFileset; // current opened file set
|
SDFileSet* pCurrentFileset; // current opened file set
|
||||||
SBlockData fileBlockData;
|
SBlockData fileBlockData;
|
||||||
|
@ -341,7 +341,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
initBlockScanInfoBuf(pBuf, numOfTables);
|
initBlockScanInfoBuf(pBuf, numOfTables);
|
||||||
|
|
||||||
SUidOrderedList* pOrderedCheckInfo = &pTsdbReader->status.uidCheckInfo;
|
STableUidList* pOrderedCheckInfo = &pTsdbReader->status.uidList;
|
||||||
|
|
||||||
pOrderedCheckInfo->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
pOrderedCheckInfo->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
||||||
if (pOrderedCheckInfo->tableUidList == NULL) {
|
if (pOrderedCheckInfo->tableUidList == NULL) {
|
||||||
|
@ -679,7 +679,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
||||||
int64_t et1 = taosGetTimestampUs();
|
int64_t et1 = taosGetTimestampUs();
|
||||||
|
|
||||||
SBlockIdx* pBlockIdx = NULL;
|
SBlockIdx* pBlockIdx = NULL;
|
||||||
SUidOrderedList* pList = &pReader->status.uidCheckInfo;
|
STableUidList* pList = &pReader->status.uidList;
|
||||||
|
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
while(i < num && j < numOfTables) {
|
while(i < num && j < numOfTables) {
|
||||||
|
@ -2768,27 +2768,15 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the last del file index
|
|
||||||
static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t order) {
|
|
||||||
void* p = taosHashIterate(pStatus->pTableMap, NULL);
|
|
||||||
while (p != NULL) {
|
|
||||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
|
|
||||||
|
|
||||||
// reset the last del file index
|
|
||||||
pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
|
|
||||||
p = taosHashIterate(pStatus->pTableMap, p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void resetTableListIndex(SReaderStatus *pStatus) {
|
static void resetTableListIndex(SReaderStatus *pStatus) {
|
||||||
SUidOrderedList* pList = &pStatus->uidCheckInfo;
|
STableUidList* pList = &pStatus->uidList;
|
||||||
|
|
||||||
pList->currentIndex = 0;
|
pList->currentIndex = 0;
|
||||||
uint64_t uid = pList->tableUidList[0];
|
uint64_t uid = pList->tableUidList[0];
|
||||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool moveToNextTable(SUidOrderedList* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
||||||
pOrderedCheckInfo->currentIndex += 1;
|
pOrderedCheckInfo->currentIndex += 1;
|
||||||
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
|
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
|
||||||
pStatus->pTableIter = NULL;
|
pStatus->pTableIter = NULL;
|
||||||
|
@ -2803,8 +2791,8 @@ static bool moveToNextTable(SUidOrderedList* pOrderedCheckInfo, SReaderStatus* p
|
||||||
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
||||||
|
STableUidList* pUidList = &pStatus->uidList;
|
||||||
|
|
||||||
SUidOrderedList* pOrderedCheckInfo = &pStatus->uidCheckInfo;
|
|
||||||
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2817,7 +2805,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2852,7 +2840,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// current table is exhausted, let's try next table
|
// current table is exhausted, let's try next table
|
||||||
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2956,14 +2944,15 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
STableUidList* pUidList = &pStatus->uidList;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (pStatus->pTableIter == NULL) {
|
// if (pStatus->pTableIter == NULL) {
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
// pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
||||||
if (pStatus->pTableIter == NULL) {
|
// if (pStatus->pTableIter == NULL) {
|
||||||
return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
|
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
|
||||||
initMemDataIterator(*pBlockScanInfo, pReader);
|
initMemDataIterator(*pBlockScanInfo, pReader);
|
||||||
|
@ -2978,9 +2967,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// current table is exhausted, let's try the next table
|
// current table is exhausted, let's try next table
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (pStatus->pTableIter == NULL) {
|
if (!hasNexTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3001,7 +2990,6 @@ void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||||
|
|
||||||
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
|
||||||
SBlockNumber num = {0};
|
SBlockNumber num = {0};
|
||||||
|
|
||||||
int32_t code = moveToNextFile(pReader, &num);
|
int32_t code = moveToNextFile(pReader, &num);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -3767,7 +3755,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
||||||
ASSERT(size >= num);
|
ASSERT(size >= num);
|
||||||
|
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
SUidOrderedList* pUidList = &pReader->status.uidCheckInfo;
|
STableUidList* pUidList = &pReader->status.uidList;
|
||||||
pUidList->currentIndex = 0;
|
pUidList->currentIndex = 0;
|
||||||
|
|
||||||
STableKeyInfo* pList = (STableKeyInfo*)pTableList;
|
STableKeyInfo* pList = (STableKeyInfo*)pTableList;
|
||||||
|
@ -3799,18 +3787,24 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
||||||
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
|
||||||
|
|
||||||
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
|
|
||||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
resetDataBlockIterator(&pStatus->blockIter, pReader->order);
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pReader->status.fileIter.numOfFiles == 0) {
|
if (pStatus->fileIter.numOfFiles == 0) {
|
||||||
pReader->status.loadFromFile = false;
|
pStatus->loadFromFile = false;
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else {
|
} else {
|
||||||
return initForFirstBlockInFile(pReader, pBlockIter);
|
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!pStatus->loadFromFile) {
|
||||||
|
resetTableListIndex(pStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ====================================== EXPOSED APIs ======================================
|
// ====================================== EXPOSED APIs ======================================
|
||||||
|
@ -4012,7 +4006,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
|
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
|
||||||
|
|
||||||
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
|
taosMemoryFree(pReader->status.uidList.tableUidList);
|
||||||
SIOCostSummary* pCost = &pReader->cost;
|
SIOCostSummary* pCost = &pReader->cost;
|
||||||
|
|
||||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||||
|
@ -4066,6 +4060,7 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
if (pBlock->info.rows > 0) {
|
if (pBlock->info.rows > 0) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
|
resetTableListIndex(&pReader->status);
|
||||||
buildBlockFromBufferSequentially(pReader);
|
buildBlockFromBufferSequentially(pReader);
|
||||||
return pBlock->info.rows > 0;
|
return pBlock->info.rows > 0;
|
||||||
}
|
}
|
||||||
|
@ -4309,6 +4304,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
if (pStatus->fileIter.numOfFiles == 0) {
|
if (pStatus->fileIter.numOfFiles == 0) {
|
||||||
pStatus->loadFromFile = false;
|
pStatus->loadFromFile = false;
|
||||||
|
resetTableListIndex(pStatus);
|
||||||
} else {
|
} else {
|
||||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue