refactor: do some internal refactor.
This commit is contained in:
parent
6467b99a52
commit
d8762b9bc9
|
@ -261,8 +261,10 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
|
||||||
pIter->pReader = pReader;
|
pIter->pReader = pReader;
|
||||||
pIter->iStt = iStt;
|
pIter->iStt = iStt;
|
||||||
pIter->backward = backward;
|
pIter->backward = backward;
|
||||||
pIter->verRange = *pRange;
|
pIter->verRange.minVer = pRange->minVer;
|
||||||
pIter->timeWindow = *pTimeWindow;
|
pIter->verRange.maxVer = pRange->maxVer;
|
||||||
|
pIter->timeWindow.skey = pTimeWindow->skey;
|
||||||
|
pIter->timeWindow.ekey = pTimeWindow->ekey;
|
||||||
|
|
||||||
pIter->pBlockLoadInfo = pBlockLoadInfo;
|
pIter->pBlockLoadInfo = pBlockLoadInfo;
|
||||||
|
|
||||||
|
|
|
@ -148,6 +148,7 @@ typedef struct STableUidList {
|
||||||
typedef struct SReaderStatus {
|
typedef struct SReaderStatus {
|
||||||
bool loadFromFile; // check file stage
|
bool loadFromFile; // check file stage
|
||||||
bool composedDataBlock; // the returned data block is a composed block or not
|
bool composedDataBlock; // the returned data block is a composed block or not
|
||||||
|
bool mapDataCleaned; // mapData has been cleaned up alreay or not
|
||||||
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||||
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
|
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
|
||||||
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
|
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
|
||||||
|
@ -875,7 +876,18 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cleanupTableScanInfo(SSHashObj* pTableMap) {
|
static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) {
|
||||||
|
// reset the index in last block when handing a new file
|
||||||
|
tMapDataClear(&pScanInfo->mapData);
|
||||||
|
taosArrayClear(pScanInfo->pBlockList);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void cleanupTableScanInfo(SReaderStatus* pStatus) {
|
||||||
|
if (pStatus->mapDataCleaned) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSHashObj* pTableMap = pStatus->pTableMap;
|
||||||
STableBlockScanInfo** px = NULL;
|
STableBlockScanInfo** px = NULL;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
|
|
||||||
|
@ -885,10 +897,10 @@ static void cleanupTableScanInfo(SSHashObj* pTableMap) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the index in last block when handing a new file
|
doCleanupTableScanInfo(*px);
|
||||||
tMapDataClear(&(*px)->mapData);
|
|
||||||
taosArrayClear((*px)->pBlockList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStatus->mapDataCleaned = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
|
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
|
||||||
|
@ -896,8 +908,10 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
size_t numOfTables = taosArrayGetSize(pIndexList);
|
size_t numOfTables = taosArrayGetSize(pIndexList);
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
cleanupTableScanInfo(pReader->status.pTableMap);
|
cleanupTableScanInfo(&pReader->status);
|
||||||
|
|
||||||
|
// set the flag for the new file
|
||||||
|
pReader->status.mapDataCleaned = false;
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
|
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
|
||||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
|
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
|
||||||
|
@ -3022,8 +3036,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
// load the last data block of current table
|
// load the last data block of current table
|
||||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||||
|
|
||||||
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
// reset the index in last block when handing a new file
|
||||||
if (!hasVal) {
|
doCleanupTableScanInfo(pScanInfo);
|
||||||
|
pStatus->mapDataCleaned = true;
|
||||||
|
|
||||||
|
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||||
|
if (!hasDataInLastFile) {
|
||||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue