refactor(query): support read data from last files.

This commit is contained in:
Haojun Liao 2022-08-17 07:51:09 +08:00
parent d242226d1a
commit 559286db71
1 changed files with 157 additions and 84 deletions

View File

@ -28,6 +28,11 @@ typedef struct {
bool hasVal; bool hasVal;
} SIterInfo; } SIterInfo;
typedef struct {
int32_t numOfBlocks;
int32_t numOfLastBlocks;
} SBlockNumber;
typedef struct STableBlockScanInfo { typedef struct STableBlockScanInfo {
uint64_t uid; uint64_t uid;
TSKEY lastKey; TSKEY lastKey;
@ -71,10 +76,11 @@ typedef struct SBlockLoadSuppInfo {
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
int32_t order; int32_t order;
SArray* pLastBlockList;// last block array list
} SFilesetIter; } SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
@ -84,12 +90,14 @@ typedef struct SFileDataBlockInfo {
} SFileDataBlockInfo; } SFileDataBlockInfo;
typedef struct SDataBlockIter { typedef struct SDataBlockIter {
int32_t numOfBlocks; SBlockNumber numOfBlocks;
int32_t index; int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo> SArray* blockList; // SArray<SFileDataBlockInfo>
SArray* pLastBlockList; // last block list
int32_t order; int32_t order;
SBlock block; // current SBlock data SBlock block; // current SBlock data
SHashObj* pTableMap; SHashObj* pTableMap;
SArray* pBlockL; // ptr to SBlockL in fileIterator
} SDataBlockIter; } SDataBlockIter;
typedef struct SFileBlockDumpInfo { typedef struct SFileBlockDumpInfo {
@ -132,6 +140,7 @@ struct STsdbReader {
STSchema* pSchema; STSchema* pSchema;
SDataFReader* pFileReader; SDataFReader* pFileReader;
SVersionRange verRange; SVersionRange verRange;
SArray* pLastBlock; // last block info
int32_t step; int32_t step;
STsdbReader* innerReader[2]; STsdbReader* innerReader[2];
@ -299,6 +308,13 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32
pIter->pFileList = aDFileSet; pIter->pFileList = aDFileSet;
pIter->numOfFiles = numOfFileset; pIter->numOfFiles = numOfFileset;
pIter->pLastBlockList = taosArrayInit(4, sizeof(SBlockL));
if (pIter->pLastBlockList == NULL) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr);
return code;
}
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -359,7 +375,7 @@ _err:
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) { static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = -1; pIter->numOfBlocks = (SBlockNumber){-1, -1};
if (pIter->blockList == NULL) { if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} else { } else {
@ -592,10 +608,10 @@ _end:
return code; return code;
} }
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables, static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* pLastBlockIndex,
int32_t* numOfBlocks) { SBlockNumber * pBlockNum, SArray* pQualifiedLastBlock) {
int32_t numOfQTable= 0;
size_t numOfTables = taosArrayGetSize(pIndexList); size_t numOfTables = taosArrayGetSize(pIndexList);
*numOfValidTables = 0;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
size_t size = 0; size_t size = 0;
@ -640,19 +656,44 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
(*numOfBlocks) += 1; pBlockNum->numOfBlocks += 1;
} }
if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) { if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
(*numOfValidTables) += 1; numOfQTable += 1;
} }
} }
double el = (taosGetTimestampUs() - st) / 1000.0; size_t numOfLast = taosArrayGetSize(pLastBlockIndex);
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s", for(int32_t i = 0; i < numOfLast; ++i) {
numOfTables, *numOfBlocks, *numOfValidTables, size / 1000.0, el, pReader->idStr); SBlockL* pLastBlock = taosArrayGet(pLastBlockIndex, i);
if (pLastBlock->suid != pReader->suid) {
continue;
}
pReader->cost.numOfBlocks += (*numOfBlocks); {
// 1. time range check, todo add later
// if (pLastBlock->.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
// continue;
// }
// 2. version range check
if (pLastBlock->minVer > pReader->verRange.maxVer || pLastBlock->maxVer < pReader->verRange.minVer) {
continue;
}
pBlockNum->numOfLastBlocks += 1;
taosArrayPush(pQualifiedLastBlock, &i);
}
}
int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s",
numOfTables, total, numOfQTable, pBlockNum->numOfLastBlocks, size / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el; pReader->cost.headFileLoadTime += el;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -682,8 +723,12 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); if (pBlockIter->index >= pBlockIter->numOfBlocks.numOfBlocks) {
return pFBlockInfo; return NULL;
} else {
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
return pFBlockInfo;
}
} }
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
@ -771,41 +816,45 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
double elapsedTime = 0;
int32_t code = 0;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; if (pFBlock != NULL) {
int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData /*, pSupInfo->colIds, numOfCols*/); SBlock* pBlock = getCurrentBlock(pBlockIter);
if (code != TSDB_CODE_SUCCESS) { code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
goto _error; if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
} else {
int32_t pos = pBlockIter->index - pBlockIter->numOfBlocks.numOfBlocks;
int32_t* index = taosArrayGet(pBlockIter->pLastBlockList, pos);
SBlockL* pBlockL = taosArrayGet(pReader->status.fileIter.pLastBlockList, *index);
code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
} }
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.blockLoadTime += elapsedTime; pReader->cost.blockLoadTime += elapsedTime;
pDumpInfo->allDumped = false; pDumpInfo->allDumped = false;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 // tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, %s", // ", rows:%d, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, // pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pReader->idStr); // pReader->idStr);
return code; return code;
} }
@ -861,10 +910,11 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); if (pFBlock != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock); tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
}
#if 0 #if 0
qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset); qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
@ -873,12 +923,17 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) { static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockNumber* pBlockNum, SArray* pQLastBlock) {
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
pBlockIter->numOfBlocks = numOfBlocks; pBlockIter->numOfBlocks = *pBlockNum;
taosArrayClear(pBlockIter->blockList); taosArrayClear(pBlockIter->blockList);
if (pBlockNum->numOfLastBlocks > 0) {
taosArrayDestroy(pBlockIter->pLastBlockList);
pBlockIter->pLastBlockList = pQLastBlock;
}
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
@ -930,20 +985,21 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
sup.numOfTables += 1; sup.numOfTables += 1;
} }
ASSERT(numOfBlocks == cnt); ASSERT(pBlockNum->numOfBlocks == cnt);
int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;
// since there is only one table qualified, blocks are not sorted // since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) { if (sup.numOfTables == 1 || pBlockNum->numOfBlocks == 0) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < pBlockNum->numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i}; SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
taosArrayPush(pBlockIter->blockList, &blockInfo); taosArrayPush(pBlockIter->blockList, &blockInfo);
} }
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
pReader, cnt, (et - st) / 1000.0, pReader->idStr); pReader, total, (et - st) / 1000.0, pReader->idStr);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1); pBlockIter->index = asc ? 0 : (total - 1);
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -952,7 +1008,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
pReader->idStr); pReader->idStr);
assert(cnt <= numOfBlocks && sup.numOfTables <= numOfTables); ASSERT(cnt <= pBlockNum->numOfBlocks && sup.numOfTables <= numOfTables);
SMultiwayMergeTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
@ -979,12 +1035,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
} }
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et - st) / 1000.0, tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, total, (et - st) / 1000.0,
pReader->idStr); pReader->idStr);
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
taosMemoryFree(pTree); taosMemoryFree(pTree);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1); pBlockIter->index = asc ? 0 : (total - 1);
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -994,7 +1050,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
bool asc = ASCENDING_TRAVERSE(pBlockIter->order); bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) { if ((pBlockIter->index >= pBlockIter->numOfBlocks.numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
return false; return false;
} }
@ -1041,7 +1097,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
int32_t index = pBlockIter->index; int32_t index = pBlockIter->index;
while (index < pBlockIter->numOfBlocks && index >= 0) { while (index < pBlockIter->numOfBlocks.numOfBlocks && index >= 0) {
SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
return index; return index;
@ -1055,7 +1111,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
} }
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) { static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) { if (index < 0 || index >= pBlockIter->numOfBlocks.numOfBlocks) {
return -1; return -1;
} }
@ -1473,9 +1529,12 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
} }
} }
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
@ -1696,7 +1755,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
return key; return key;
} }
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pQLastBlock) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
@ -1715,18 +1774,25 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
return code; return code;
} }
if (taosArrayGetSize(pIndexList) > 0) { code = tsdbReadBlockL(pReader->pFileReader, pStatus->fileIter.pLastBlockList);
uint32_t numOfValidTable = 0; if (code != TSDB_CODE_SUCCESS) {
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks); taosArrayDestroy(pIndexList);
return code;
}
if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pStatus->fileIter.pLastBlockList) > 0) {
code = doLoadFileBlock(pReader, pIndexList, pStatus->fileIter.pLastBlockList, pBlockNum, pQLastBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList); taosArrayDestroy(pIndexList);
return code; return code;
} }
if (numOfValidTable > 0) { if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
break; break;
} }
} }
// no blocks in current file, try next files // no blocks in current file, try next files
} }
@ -1741,21 +1807,27 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pScanInfo = NULL;
SBlock* pBlock = NULL;
TSDBKEY key = {0};
SBlock* pBlock = getCurrentBlock(pBlockIter); if (pFBlock != NULL) {
pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
pBlock = getCurrentBlock(pBlockIter);
key = getCurrentKeyInBuf(pBlockIter, pReader);
}
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader); if (pFBlock == NULL || fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData);
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// build composed data block // build composed data block
code = buildComposedDataBlock(pReader, pScanInfo); code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) { } else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block // data in memory that are earlier than current file block
// todo rows in buffer should be less than the file block in asc, greater than file block in desc // todo rows in buffer should be less than the file block in asc, greater than file block in desc
@ -1819,20 +1891,23 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
} }
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int32_t numOfBlocks = 0; SBlockNumber num = {0};
int32_t code = moveToNextFile(pReader, &numOfBlocks); SArray* pQLastBlock = taosArrayInit(4, sizeof(int32_t));
int32_t code = moveToNextFile(pReader, &num, pQLastBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// all data files are consumed, try data in buffer // all data files are consumed, try data in buffer
if (numOfBlocks == 0) { if (num.numOfBlocks + num.numOfLastBlocks == 0) {
taosArrayDestroy(pQLastBlock);
pReader->status.loadFromFile = false; pReader->status.loadFromFile = false;
return code; return code;
} }
// initialize the block iterator for a new fileset // initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks); code = initBlockIterator(pReader, pBlockIter, &num, pQLastBlock);
// set the correct start position according to the query time window // set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
@ -1851,13 +1926,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
while (1) { while (1) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded if (fileBlockPartiallyRead(pDumpInfo, asc)) { // file data block is partially loaded
code = buildComposedDataBlock(pReader, pScanInfo); code = buildComposedDataBlock(pReader);
} else { } else {
// current block are exhausted, try the next file block // current block are exhausted, try the next file block
if (pDumpInfo->allDumped) { if (pDumpInfo->allDumped) {
@ -2209,7 +2281,7 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
// 3. load the neighbor block, and set it to be the currently accessed file data block // 3. load the neighbor block, and set it to be the currently accessed file data block
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2569,6 +2641,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
pCond->order = TSDB_ORDER_ASC; pCond->order = TSDB_ORDER_ASC;
} }
// here we only need one more row, so the capacity is set to be ONE.
code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr); code = tsdbReaderCreate(pVnode, pCond, &pReader->innerReader[0], 1, idstr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
@ -2872,7 +2945,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData); tBlockDataClear(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData, 1); tBlockDataDestroy(&pStatus->fileBlockData, 1);
@ -2968,12 +3041,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles; pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
if (pBlockIter->numOfBlocks > 0) { if (pBlockIter->numOfBlocks.numOfBlocks > 0) {
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks; pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks.numOfBlocks;
} }
pTableBlockInfo->numOfTables = numOfTables; pTableBlockInfo->numOfTables = numOfTables;
bool hasNext = (pBlockIter->numOfBlocks > 0); bool hasNext = (pBlockIter->numOfBlocks.numOfBlocks > 0);
while (true) { while (true) {
if (hasNext) { if (hasNext) {
@ -3004,8 +3077,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
break; break;
} }
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks; pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks.numOfBlocks;
hasNext = (pBlockIter->numOfBlocks > 0); hasNext = (pBlockIter->numOfBlocks.numOfBlocks > 0);
} }
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,