Merge pull request #23924 from taosdata/fix/3_liaohj

enh(tsdb): extract rows for the given tables in all stt files.
This commit is contained in:
Haojun Liao 2023-12-04 19:21:19 +08:00 committed by GitHub
commit cc5890d56a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 331 additions and 179 deletions

View File

@ -258,8 +258,6 @@ typedef struct SQueryTableDataCond {
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData); void colDataDestroy(SColumnInfoData* pColData);
//====================================================================================================================== //======================================================================================================================
@ -294,7 +292,7 @@ typedef struct STableBlockDistInfo {
int32_t defMaxRows; int32_t defMaxRows;
int32_t firstSeekTimeUs; int32_t firstSeekTimeUs;
uint32_t numOfInmemRows; uint32_t numOfInmemRows;
uint32_t numOfSmallBlocks; uint32_t numOfSttRows;
uint32_t numOfVgroups; uint32_t numOfVgroups;
int32_t blockRowsHisto[20]; int32_t blockRowsHisto[20];
} STableBlockDistInfo; } STableBlockDistInfo;

View File

@ -153,26 +153,6 @@ typedef struct STsdbReader STsdbReader;
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8 #define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbReaderOpen(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
SHashObj **pIgnoreTables);
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
void tsdbReaderSetId(STsdbReader *pReader, const char *idstr);
void tsdbReaderClose(STsdbReader *pReader);
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);
void tsdbReleaseDataBlock(STsdbReader *pReader);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
//======================================================================================================================
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly, SSDataBlock *pResBlock, void **ppReader, const char *idstr, bool countOnly,
SHashObj **pIgnoreTables); SHashObj **pIgnoreTables);
@ -190,7 +170,6 @@ void *tsdbGetIdx2(SMeta *pMeta);
void *tsdbGetIvtIdx2(SMeta *pMeta); void *tsdbGetIvtIdx2(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader); uint64_t tsdbGetReaderMaxVersion2(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp2(SVnode *pVnode, void *pTableList, int32_t numOfTables, const char *pIdStr);
//====================================================================================================================== //======================================================================================================================
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);

View File

@ -886,10 +886,9 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost); void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo *pLoadCost);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================

View File

@ -22,7 +22,7 @@
static void tLDataIterClose2(SLDataIter *pIter); static void tLDataIterClose2(SLDataIter *pIter);
// SLDataIter ================================================= // SLDataIter =================================================
SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) { SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo)); SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo));
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -61,7 +61,7 @@ void getSttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, SSttBlockLoadCostInfo* pL
} }
} }
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
return NULL; return NULL;
} }
@ -78,14 +78,19 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
pInfo->sttBlockIndex = -1; pInfo->sttBlockIndex = -1;
pInfo->pin = false; pInfo->pin = false;
if (pLoadInfo->statisBlock != NULL) {
tStatisBlockDestroy(pLoadInfo->statisBlock);
taosMemoryFreeClear(pLoadInfo->statisBlock);
}
taosArrayDestroy(pLoadInfo->aSttBlk); taosArrayDestroy(pLoadInfo->aSttBlk);
taosMemoryFree(pLoadInfo); taosMemoryFree(pLoadInfo);
return NULL; return NULL;
} }
static void destroyLDataIter(SLDataIter *pIter) { void destroyLDataIter(SLDataIter *pIter) {
tLDataIterClose2(pIter); tLDataIterClose2(pIter);
destroyLastBlockLoadInfo(pIter->pBlockLoadInfo); destroySttBlockLoadInfo(pIter->pBlockLoadInfo);
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
@ -732,25 +737,6 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
return -1 * tLDataIterCmprFn(p1, p2); return -1 * tLDataIterCmprFn(p1, p2);
} }
static void adjustValidLDataIters(SArray *pLDIterList, int32_t numOfFileObj) {
int32_t size = taosArrayGetSize(pLDIterList);
if (size < numOfFileObj) {
int32_t inc = numOfFileObj - size;
for (int32_t k = 0; k < inc; ++k) {
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
taosArrayPush(pLDIterList, &pIter);
}
} else if (size > numOfFileObj) { // remove unused LDataIter
int32_t inc = size - numOfFileObj;
for (int i = 0; i < inc; ++i) {
SLDataIter *pIter = taosArrayPop(pLDIterList);
destroyLDataIter(pIter);
}
}
}
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -773,19 +759,13 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
} }
// add the list/iter placeholder // add the list/iter placeholder
while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < numOfLevels) { adjustLDataIters(pConf->pSttFileBlockIterArray, pConf->pCurrentFileset);
SArray *pList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
}
for (int32_t j = 0; j < numOfLevels; ++j) { for (int32_t j = 0; j < numOfLevels; ++j) {
SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j]; SSttLvl *pSttLevel = ((STFileSet *)pConf->pCurrentFileset)->lvlArr->data[j];
SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j); SArray *pList = taosArrayGetP(pConf->pSttFileBlockIterArray, j);
int32_t numOfFileObj = TARRAY2_SIZE(pSttLevel->fobjArr); for (int32_t i = 0; i < TARRAY2_SIZE(pSttLevel->fobjArr); ++i) { // open all last file
adjustValidLDataIters(pList, numOfFileObj);
for (int32_t i = 0; i < numOfFileObj; ++i) { // open all last file
SLDataIter *pIter = taosArrayGetP(pList, i); SLDataIter *pIter = taosArrayGetP(pList, i);
SSttFileReader *pSttFileReader = pIter->pReader; SSttFileReader *pSttFileReader = pIter->pReader;
@ -805,7 +785,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
} }
if (pLoadInfo == NULL) { if (pLoadInfo == NULL) {
pLoadInfo = tCreateOneLastBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
} }
memset(pIter, 0, sizeof(SLDataIter)); memset(pIter, 0, sizeof(SLDataIter));

View File

@ -23,14 +23,14 @@
#include "tsimplehash.h" #include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) #define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader); STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
@ -52,10 +52,10 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader); static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo); static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
@ -138,16 +138,16 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
pIter->pFilesetList = pFileSetArray; pIter->pFilesetList = pFileSetArray;
pIter->numOfFiles = numOfFileset; pIter->numOfFiles = numOfFileset;
if (pIter->pLastBlockReader == NULL) { if (pIter->pSttBlockReader == NULL) {
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader)); pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader));
if (pIter->pLastBlockReader == NULL) { if (pIter->pSttBlockReader == NULL) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_OUT_OF_MEMORY;
tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr); tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
return code; return code;
} }
} }
SLastBlockReader* pLReader = pIter->pLastBlockReader; SSttBlockReader* pLReader = pIter->pSttBlockReader;
pLReader->order = pReader->info.order; pLReader->order = pReader->info.order;
pLReader->window = pReader->info.window; pLReader->window = pReader->info.window;
pLReader->verRange = pReader->info.verRange; pLReader->verRange = pReader->info.verRange;
@ -171,8 +171,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
SReadCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
pIter->pLastBlockReader->uid = 0; pIter->pSttBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pSttBlockReader->mergeTree);
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
@ -1404,26 +1404,26 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
return code; return code;
} }
static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
SVersionRange* pVerRange) { SVersionRange* pVerRange) {
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pSttBlockReader->order) ? 1 : -1;
while (1) { while (1) {
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
if (!hasVal) { // the next value will be the accessed key in stt if (!hasVal) { // the next value will be the accessed key in stt
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA; pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey += step; pScanInfo->sttKeyInfo.nextProcKey += step;
return false; return false;
} }
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow]; int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
pLastBlockReader->currentKey = key; pSttBlockReader->currentKey = key;
pScanInfo->sttKeyInfo.nextProcKey = key; pScanInfo->sttKeyInfo.nextProcKey = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order, if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pSttBlockReader->order,
pVerRange)) { pVerRange)) {
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA; pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
return true; return true;
@ -1431,26 +1431,26 @@ static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlock
} }
} }
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { static void doPinSttBlock(SSttBlockReader* pSttBlockReader) {
tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); tMergeTreePinSttBlock(&pSttBlockReader->mergeTree);
} }
static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) { static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) {
tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree); tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree);
} }
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader, static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
bool* copied) { bool* copied) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
*copied = false; *copied = false;
// avoid the fetch next row replace the referenced stt block in buffer // avoid the fetch next row replace the referenced stt block in buffer
doPinSttBlock(pLastBlockReader); doPinSttBlock(pSttBlockReader);
bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange); bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
doUnpinSttBlock(pLastBlockReader); doUnpinSttBlock(pSttBlockReader);
if (hasVal) { if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader);
if (next1 != ts) { if (next1 != ts) {
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow); code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
if (code) { if (code) {
@ -1507,15 +1507,15 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
} }
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SIterInfo* pIter, int64_t key, SSttBlockReader* pSttBlockReader) {
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInLastBlock(pLastBlockReader)) { if (hasDataInSttBlock(pSttBlockReader)) {
tsLast = getCurrentKeyInLastBlock(pLastBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY k = TSDBROW_KEY(pRow);
@ -1533,7 +1533,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t minKey = 0; int64_t minKey = 0;
if (pReader->info.order == TSDB_ORDER_ASC) { if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) { if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1546,7 +1546,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
} else { } else {
minKey = INT64_MIN; minKey = INT64_MIN;
if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) { if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
@ -1571,12 +1571,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
} }
@ -1621,12 +1621,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL); int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
} }
@ -1652,27 +1652,27 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) { bool mergeBlockData) {
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
bool copied = false; bool copied = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
// create local variable to hold the row value // create local variable to hold the row value
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr); fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
// only stt block exists // only stt block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied); code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
if (code) { if (code) {
return code; return code;
} }
@ -1686,9 +1686,9 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
@ -1711,7 +1711,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
// merge with block data if ts == key // merge with block data if ts == key
@ -1737,8 +1737,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
@ -1753,24 +1753,24 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (hasDataInFileBlock(pBlockData, pDumpInfo)) { if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
// no last block available, only data block exists // no last block available, only data block exists
if (!hasDataInLastBlock(pLastBlockReader)) { if (!hasDataInSttBlock(pSttBlockReader)) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} }
// row in last file block // row in last file block
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader); int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < tsLast) { if (key < tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key > tsLast) { } else if (key > tsLast) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
} else { } else {
if (key > tsLast) { if (key > tsLast) {
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key < tsLast) { } else if (key < tsLast) {
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
} }
// the following for key == tsLast // the following for key == tsLast
@ -1782,13 +1782,13 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL); code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1802,12 +1802,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return code; return code;
} else { // only last block exists } else { // only last block exists
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
} }
} }
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) { SSttBlockReader* pSttBlockReader) {
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1818,8 +1818,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
int64_t tsLast = INT64_MIN; int64_t tsLast = INT64_MIN;
if (hasDataInLastBlock(pLastBlockReader)) { if (hasDataInSttBlock(pSttBlockReader)) {
tsLast = getCurrentKeyInLastBlock(pLastBlockReader); tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
} }
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN; int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key; minKey = key;
} }
if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) { if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
} else { } else {
@ -1884,7 +1884,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
minKey = key; minKey = key;
} }
if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) { if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
minKey = tsLast; minKey = tsLast;
} }
} }
@ -1903,13 +1903,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL); code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
} }
@ -1962,13 +1962,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
if (minKey == tsLast) { if (minKey == tsLast) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
code = tsdbRowMergerAdd(pMerger, pRow1, NULL); code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
} }
@ -2087,10 +2087,10 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return true; return true;
} }
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { static bool initSttBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table. // the last block reader has been initialized for this table.
if (pLBlockReader->uid == pScanInfo->uid) { if (pLBlockReader->uid == pScanInfo->uid) {
return hasDataInLastBlock(pLBlockReader); return hasDataInSttBlock(pLBlockReader);
} }
if (pLBlockReader->uid != 0) { if (pLBlockReader->uid != 0) {
@ -2139,13 +2139,13 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange); code = nextRowFromSttBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
int64_t el = taosGetTimestampUs() - st; int64_t el = taosGetTimestampUs() - st;
pReader->cost.initLastBlockReader += (el / 1000.0); pReader->cost.initSttBlockReader += (el / 1000.0);
tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
return code; return code;
} }
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
@ -2201,7 +2201,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} }
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SBlockData* pBlockData, SSttBlockReader* pSttBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL; TSDBROW *pRow = NULL, *piRow = NULL;
@ -2218,21 +2218,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
// two levels of mem-table does contain the valid rows // two levels of mem-table does contain the valid rows
if (pRow != NULL && piRow != NULL) { if (pRow != NULL && piRow != NULL) {
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pSttBlockReader);
} }
// imem + file + last block // imem + file + last block
if (pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader);
} }
// mem + file + last block // mem + file + last block
if (pBlockScanInfo->iter.hasVal) { if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader);
} }
// files data blocks + last block // files data blocks + last block
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData); return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData);
} }
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
@ -2293,7 +2293,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
@ -2342,7 +2342,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader); initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
while (1) { while (1) {
bool hasBlockData = false; bool hasBlockData = false;
@ -2376,7 +2376,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
break; break;
} }
code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pSttBlockReader);
if (code) { if (code) {
goto _end; goto _end;
} }
@ -2561,9 +2561,9 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
return (pStatus->pTableIter != NULL); return (pStatus->pTableIter != NULL);
} }
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -2601,7 +2601,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
continue; continue;
} }
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); bool hasDataInLastFile = initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) { if (!hasDataInLastFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus); bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) { if (!hasNexTable) {
@ -2613,14 +2613,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (1) { while (1) {
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader); bool hasBlockLData = hasDataInSttBlock(pSttBlockReader);
// no data in last block and block, no need to proceed. // no data in last block and block, no need to proceed.
if (hasBlockLData == false) { if (hasBlockLData == false) {
break; break;
} }
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
if (code) { if (code) {
return code; return code;
} }
@ -2667,7 +2667,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
STableBlockScanInfo* pScanInfo = NULL; STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
bool asc = ASCENDING_TRAVERSE(pReader->info.order); bool asc = ASCENDING_TRAVERSE(pReader->info.order);
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
@ -2685,7 +2685,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
} }
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
initLastBlockReader(pLastBlockReader, pScanInfo, pReader); initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
} }
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
@ -2730,13 +2730,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
// let's load data from stt files // let's load data from stt files
initLastBlockReader(pLastBlockReader, pScanInfo, pReader); initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
// no data in last block, no need to proceed. // no data in last block, no need to proceed.
while (hasDataInLastBlock(pLastBlockReader)) { while (hasDataInSttBlock(pSttBlockReader)) {
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
@ -2746,7 +2746,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
} }
// data in stt now overlaps with current active file data block, need to composed with file data block. // data in stt now overlaps with current active file data block, need to composed with file data block.
int64_t lastKeyInStt = getCurrentKeyInLastBlock(pLastBlockReader); int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) || if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) ||
(lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) { (lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) {
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader, tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
@ -2816,11 +2816,11 @@ _end:
static int32_t doSumSttBlockRows(STsdbReader* pReader) { static int32_t doSumSttBlockRows(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL; SSttBlockLoadInfo* pBlockLoadInfo = NULL;
#if 0 #if 0
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pLastBlockReader->pInfo[i]; pBlockLoadInfo = &pSttBlockReader->pInfo[i];
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk); code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) { if (code) {
@ -3020,7 +3020,7 @@ typedef enum {
TSDB_READ_CONTINUE = 0x2, TSDB_READ_CONTINUE = 0x2,
} ERetrieveType; } ERetrieveType;
static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
@ -3030,7 +3030,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
while (1) { while (1) {
terrno = 0; terrno = 0;
code = doLoadLastBlockSequentially(pReader); code = doLoadSttBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
terrno = code; terrno = code;
return TSDB_READ_RETURN; return TSDB_READ_RETURN;
@ -3067,7 +3067,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (pBlockIter->numOfBlocks == 0) { if (pBlockIter->numOfBlocks == 0) {
// let's try to extract data from stt files. // let's try to extract data from stt files.
ERetrieveType type = doReadDataFromLastFiles(pReader); ERetrieveType type = doReadDataFromSttFiles(pReader);
if (type == TSDB_READ_RETURN) { if (type == TSDB_READ_RETURN) {
return terrno; return terrno;
} }
@ -3102,7 +3102,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
resetDataBlockIterator(pBlockIter, pReader->info.order); resetDataBlockIterator(pBlockIter, pReader->info.order);
resetTableListIndex(&pReader->status); resetTableListIndex(&pReader->status);
ERetrieveType type = doReadDataFromLastFiles(pReader); ERetrieveType type = doReadDataFromSttFiles(pReader);
if (type == TSDB_READ_RETURN) { if (type == TSDB_READ_RETURN) {
return terrno; return terrno;
} }
@ -3434,12 +3434,12 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) { SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) { while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader);
if (next1 == ts) { if (next1 == ts) {
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else { } else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
@ -4097,8 +4097,8 @@ void tsdbReaderClose2(STsdbReader* pReader) {
SReadCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) { if (pFilesetIter->pSttBlockReader != NULL) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
taosMemoryFree(pLReader); taosMemoryFree(pLReader);
} }
@ -4121,12 +4121,12 @@ void tsdbReaderClose2(STsdbReader* pReader) {
"build in-memory-block-time:%.2f ms, sttBlocks:%" PRId64 ", sttBlocks-time:%.2f ms, sttStatisBlock:%" PRId64 "build in-memory-block-time:%.2f ms, sttBlocks:%" PRId64 ", sttBlocks-time:%.2f ms, sttStatisBlock:%" PRId64
", stt-statis-Block-time:%.2f ms, composed-blocks:%" PRId64 ", stt-statis-Block-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f " ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f "
"ms, initLastBlockReader:%.2fms, %s", "ms, initSttBlockReader:%.2fms, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks, pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttCost.loadBlocks, pCost->sttCost.blockElapsedTime, pCost->blockLoadTime, pCost->buildmemBlock, pCost->sttCost.loadBlocks, pCost->sttCost.blockElapsedTime,
pCost->sttCost.loadStatisBlocks, pCost->sttCost.statisElapsedTime, pCost->composedBlocks, pCost->sttCost.loadStatisBlocks, pCost->sttCost.statisElapsedTime, pCost->composedBlocks,
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList, pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pCost->createSkylineIterTime, pCost->initLastBlockReader, pReader->idStr); pCost->createSkylineIterTime, pCost->initSttBlockReader, pReader->idStr);
taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->idStr);
@ -4730,7 +4730,20 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
return code; return code;
} }
} }
SMergeTreeConf conf = {
.pReader = pReader,
.pSchema = pReader->info.pSchema,
.pCols = pReader->suppInfo.colId,
.numOfCols = pReader->suppInfo.numOfCols,
.suid = pReader->info.suid,
};
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
if (pStatus->pCurrentFileset != NULL) {
pTableBlockInfo->numOfSttRows += tsdbGetRowsInSttFiles(pStatus->pCurrentFileset, pStatus->pLDataIterArray,
pReader->pTsdb, &conf, pReader->idStr);
}
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg; STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
pTableBlockInfo->defMinRows = pc->minRows; pTableBlockInfo->defMinRows = pc->minRows;
@ -4767,10 +4780,6 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
pTableBlockInfo->minRows = numOfRows; pTableBlockInfo->minRows = numOfRows;
} }
if (numOfRows < defaultRows) {
pTableBlockInfo->numOfSmallBlocks += 1;
}
pTableBlockInfo->totalSize += pBlockInfo->record.blockSize; pTableBlockInfo->totalSize += pBlockInfo->record.blockSize;
int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBuckets); int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows, numOfBuckets);
@ -4783,13 +4792,18 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
break; break;
} }
// add the data in stt files of new fileset
if (pStatus->pCurrentFileset != NULL) {
pTableBlockInfo->numOfSttRows += tsdbGetRowsInSttFiles(pStatus->pCurrentFileset, pStatus->pLDataIterArray,
pReader->pTsdb, &conf, pReader->idStr);
}
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks; pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
hasNext = (pBlockIter->numOfBlocks > 0); hasNext = (pBlockIter->numOfBlocks > 0);
} }
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
} }
// record the data in stt files
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
return code; return code;
} }
@ -4976,7 +4990,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
taosMemoryFreeClear(pReader->idStr); taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr); pReader->idStr = taosStrdup(idstr);
pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr; pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr;
} }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/

View File

@ -488,6 +488,8 @@ typedef enum {
BLK_CHECK_QUIT = 0x2, BLK_CHECK_QUIT = 0x2,
} ETombBlkCheckEnum; } ETombBlkCheckEnum;
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo,
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j);
static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j, static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_t numOfTables, int32_t* j,
ETombBlkCheckEnum* pRet) { ETombBlkCheckEnum* pRet) {
int32_t code = 0; int32_t code = 0;
@ -659,3 +661,177 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT
} }
} }
} }
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid,
const uint64_t* pUidList, int32_t numOfTables) {
int32_t num = 0;
const TStatisBlkArray *pStatisBlkArray = pBlockLoadInfo->pSttStatisBlkArray;
if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
return 0;
}
int32_t i = 0;
while((i < TARRAY2_SIZE(pStatisBlkArray)) && (pStatisBlkArray->data[i].minTbid.suid < suid)) {
++i;
}
if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
return 0;
}
SStatisBlk *p = &pStatisBlkArray->data[i];
if (pBlockLoadInfo->statisBlock == NULL) {
pBlockLoadInfo->statisBlock = taosMemoryCalloc(1, sizeof(STbStatisBlock));
tStatisBlockInit(pBlockLoadInfo->statisBlock);
}
int64_t st = taosGetTimestampMs();
tsdbSttFileReadStatisBlock(pSttFileReader, p, pBlockLoadInfo->statisBlock);
pBlockLoadInfo->statisBlockIndex = i;
double el = (taosGetTimestampMs() - st) / 1000.0;
pBlockLoadInfo->cost.loadStatisBlocks += 1;
pBlockLoadInfo->cost.statisElapsedTime += el;
STbStatisBlock *pBlock = pBlockLoadInfo->statisBlock;
int32_t index = 0;
while (index < TARRAY2_SIZE(pBlock->suid) && pBlock->suid->data[index] < suid) {
++index;
}
if (index >= TARRAY2_SIZE(pBlock->suid)) {
return num;
}
int32_t j = index;
int32_t uidIndex = 0;
while (i < TARRAY2_SIZE(pStatisBlkArray) && uidIndex <= numOfTables) {
p = &pStatisBlkArray->data[i];
if (p->minTbid.suid > suid) {
return num;
}
uint64_t uid = pUidList[uidIndex];
if (pBlock->uid->data[j] == uid) {
num += pBlock->count->data[j];
uidIndex += 1;
j += 1;
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j);
} else if (pBlock->uid->data[j] < uid) {
j += 1;
loadNextStatisticsBlock(pSttFileReader, pBlockLoadInfo, pStatisBlkArray, pBlock->suid->size, &i, &j);
} else {
uidIndex += 1;
}
}
return num;
}
// load next stt statistics block
static void loadNextStatisticsBlock(SSttFileReader* pSttFileReader, const SSttBlockLoadInfo* pBlockLoadInfo,
const TStatisBlkArray* pStatisBlkArray, int32_t numOfRows, int32_t* i, int32_t* j) {
if ((*j) >= numOfRows) {
(*i) += 1;
(*j) = 0;
if ((*i) < TARRAY2_SIZE(pStatisBlkArray)) {
tsdbSttFileReadStatisBlock(pSttFileReader, &pStatisBlkArray->data[(*i)], pBlockLoadInfo->statisBlock);
}
}
}
void doAdjustValidDataIters(SArray* pLDIterList, int32_t numOfFileObj) {
int32_t size = taosArrayGetSize(pLDIterList);
if (size < numOfFileObj) {
int32_t inc = numOfFileObj - size;
for (int32_t k = 0; k < inc; ++k) {
SLDataIter *pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
taosArrayPush(pLDIterList, &pIter);
}
} else if (size > numOfFileObj) { // remove unused LDataIter
int32_t inc = size - numOfFileObj;
for (int i = 0; i < inc; ++i) {
SLDataIter *pIter = taosArrayPop(pLDIterList);
destroyLDataIter(pIter);
}
}
}
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) {
int32_t numOfLevels = pFileSet->lvlArr->size;
// add the list/iter placeholder
while (taosArrayGetSize(pSttFileBlockIterArray) < numOfLevels) {
SArray* pList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pSttFileBlockIterArray, &pList);
}
for(int32_t j = 0; j < numOfLevels; ++j) {
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
doAdjustValidDataIters(pList, TARRAY2_SIZE(pSttLevel->fobjArr));
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
const char* pstr) {
int32_t numOfRows = 0;
// no data exists, go to end
int32_t numOfLevels = pFileSet->lvlArr->size;
if (numOfLevels == 0) {
return numOfRows;
}
// add the list/iter placeholder
adjustLDataIters(pSttFileBlockIterArray, pFileSet);
for (int32_t j = 0; j < numOfLevels; ++j) {
SSttLvl* pSttLevel = pFileSet->lvlArr->data[j];
SArray* pList = taosArrayGetP(pSttFileBlockIterArray, j);
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { // open all last file
SLDataIter* pIter = taosArrayGetP(pList, i);
// open stt file reader if not opened yet
// if failed to open this stt file, ignore the error and try next one
if (pIter->pReader == NULL) {
SSttFileReaderConfig conf = {.tsdb = pTsdb, .szPage = pTsdb->pVnode->config.tsdbPageSize};
conf.file[0] = *pSttLevel->fobjArr->data[i]->f;
const char* pName = pSttLevel->fobjArr->data[i]->fname;
int32_t code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("open stt file reader error. file:%s, code %s, %s", pName, tstrerror(code), pstr);
continue;
}
}
if (pIter->pBlockLoadInfo == NULL) {
pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols);
}
// load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pIter->pBlockLoadInfo->pSttStatisBlkArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr);
continue;
}
// extract rows from each stt file one-by-one
STsdbReader* pReader = pConf->pReader;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
uint64_t* pUidList = pReader->status.uidList.tableUidList;
numOfRows += getNumOfRowsInSttBlock(pIter->pReader, pIter->pBlockLoadInfo, pConf->suid, pUidList, numOfTables);
}
}
return numOfRows;
}

View File

@ -109,7 +109,7 @@ typedef struct SReadCostSummary {
double buildComposedBlockTime; double buildComposedBlockTime;
double createScanInfoList; double createScanInfoList;
double createSkylineIterTime; double createSkylineIterTime;
double initLastBlockReader; double initSttBlockReader;
} SReadCostSummary; } SReadCostSummary;
typedef struct STableUidList { typedef struct STableUidList {
@ -145,21 +145,21 @@ typedef struct SBlockLoadSuppInfo {
bool smaValid; // the sma on all queried columns are activated bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SLastBlockReader { typedef struct SSttBlockReader {
STimeWindow window; STimeWindow window;
SVersionRange verRange; SVersionRange verRange;
int32_t order; int32_t order;
uint64_t uid; uint64_t uid;
SMergeTree mergeTree; SMergeTree mergeTree;
int64_t currentKey; int64_t currentKey;
} SLastBlockReader; } SSttBlockReader;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
TFileSetArray* pFilesetList; // data file set list TFileSetArray* pFilesetList; // data file set list
int32_t order; int32_t order;
SLastBlockReader* pLastBlockReader; // last file block reader SSttBlockReader* pSttBlockReader; // last file block reader
} SFilesetIter; } SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
@ -262,7 +262,12 @@ bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
int32_t getNumOfRowsInSttBlock(SSttFileReader *pSttFileReader, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid,
const uint64_t* pUidList, int32_t numOfTables);
void destroyLDataIter(SLDataIter* pIter);
int32_t adjustLDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet);
int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf,
const char* pstr);
typedef struct { typedef struct {
SArray* pTombData; SArray* pTombData;
} STableLoadInfo; } STableLoadInfo;

View File

@ -5472,7 +5472,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDistInfo p1 = {0}; STableBlockDistInfo p1 = {0};
@ -5481,6 +5480,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
pDistInfo->numOfBlocks += p1.numOfBlocks; pDistInfo->numOfBlocks += p1.numOfBlocks;
pDistInfo->numOfTables += p1.numOfTables; pDistInfo->numOfTables += p1.numOfTables;
pDistInfo->numOfInmemRows += p1.numOfInmemRows; pDistInfo->numOfInmemRows += p1.numOfInmemRows;
pDistInfo->numOfSttRows += p1.numOfSttRows;
pDistInfo->totalSize += p1.totalSize; pDistInfo->totalSize += p1.totalSize;
pDistInfo->totalRows += p1.totalRows; pDistInfo->totalRows += p1.totalRows;
pDistInfo->numOfFiles += p1.numOfFiles; pDistInfo->numOfFiles += p1.numOfFiles;
@ -5488,7 +5488,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
pDistInfo->defMinRows = p1.defMinRows; pDistInfo->defMinRows = p1.defMinRows;
pDistInfo->defMaxRows = p1.defMaxRows; pDistInfo->defMaxRows = p1.defMaxRows;
pDistInfo->rowSize = p1.rowSize; pDistInfo->rowSize = p1.rowSize;
pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks;
if (pDistInfo->minRows > p1.minRows) { if (pDistInfo->minRows > p1.minRows) {
pDistInfo->minRows = p1.minRows; pDistInfo->minRows = p1.minRows;
@ -5523,7 +5522,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1; if (tEncodeI32(&encoder, pInfo->defMaxRows) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1; if (tEncodeI32(&encoder, pInfo->defMinRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfInmemRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfSmallBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfSttRows) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
@ -5555,7 +5554,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->defMaxRows) < 0) return -1;
if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->defMinRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfInmemRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfSmallBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfSttRows) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) { for (int32_t i = 0; i < tListLen(pInfo->blockRowsHisto); ++i) {
@ -5589,7 +5588,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
} }
int32_t len = sprintf(st + VARSTR_HEADER_SIZE, int32_t len = sprintf(st + VARSTR_HEADER_SIZE,
"Total_Blocks=[%d] Total_Size=[%.2f KB] Average_size=[%.2f KB] Compression_Ratio=[%.2f %c]", "Total_Blocks=[%d] Total_Size=[%.2f KiB] Average_size=[%.2f KiB] Compression_Ratio=[%.2f %c]",
pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%'); pData->numOfBlocks, pData->totalSize / 1024.0, averageSize / 1024.0, compRatio, '%');
varDataSetLen(st, len); varDataSetLen(st, len);
@ -5600,14 +5599,16 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
avgRows = pData->totalRows / pData->numOfBlocks; avgRows = pData->totalRows / pData->numOfBlocks;
} }
len = sprintf(st + VARSTR_HEADER_SIZE, len = sprintf(st + VARSTR_HEADER_SIZE, "Block_Rows=[%" PRId64 "] MinRows=[%d] MaxRows=[%d] AvgRows=[%" PRId64 "]",
"Total_Rows=[%" PRId64 "] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%" PRId64 "]", pData->totalRows, pData->minRows, pData->maxRows, avgRows);
pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, avgRows);
varDataSetLen(st, len); varDataSetLen(st, len);
colDataSetVal(pColInfo, row++, st, false); colDataSetVal(pColInfo, row++, st, false);
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, len = sprintf(st + VARSTR_HEADER_SIZE, "Inmem_Rows=[%d] Stt_Rows=[%d] ", pData->numOfInmemRows, pData->numOfSttRows);
varDataSetLen(st, len);
colDataSetVal(pColInfo, row++, st, false);
len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Filesets=[%d] Total_Vgroups=[%d]", pData->numOfTables,
pData->numOfFiles, pData->numOfVgroups); pData->numOfFiles, pData->numOfVgroups);
varDataSetLen(st, len); varDataSetLen(st, len);