fix(tsdb): do some internal refactor.
This commit is contained in:
parent
4e5853d9fe
commit
29a1c6d518
|
@ -23,14 +23,14 @@
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
|
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
|
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
|
||||||
|
|
||||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
||||||
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
|
||||||
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
|
||||||
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
|
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
|
||||||
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
|
||||||
|
@ -52,7 +52,7 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32
|
||||||
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
|
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
|
||||||
int8_t* pLevel);
|
int8_t* pLevel);
|
||||||
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
|
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
|
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader);
|
||||||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||||
|
@ -138,16 +138,16 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
|
||||||
pIter->pFilesetList = pFileSetArray;
|
pIter->pFilesetList = pFileSetArray;
|
||||||
pIter->numOfFiles = numOfFileset;
|
pIter->numOfFiles = numOfFileset;
|
||||||
|
|
||||||
if (pIter->pLastBlockReader == NULL) {
|
if (pIter->pSttBlockReader == NULL) {
|
||||||
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
|
pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader));
|
||||||
if (pIter->pLastBlockReader == NULL) {
|
if (pIter->pSttBlockReader == NULL) {
|
||||||
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
|
tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SLastBlockReader* pLReader = pIter->pLastBlockReader;
|
SSttBlockReader* pLReader = pIter->pSttBlockReader;
|
||||||
pLReader->order = pReader->info.order;
|
pLReader->order = pReader->info.order;
|
||||||
pLReader->window = pReader->info.window;
|
pLReader->window = pReader->info.window;
|
||||||
pLReader->verRange = pReader->info.verRange;
|
pLReader->verRange = pReader->info.verRange;
|
||||||
|
@ -171,8 +171,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
|
||||||
|
|
||||||
SReadCostSummary* pCost = &pReader->cost;
|
SReadCostSummary* pCost = &pReader->cost;
|
||||||
|
|
||||||
pIter->pLastBlockReader->uid = 0;
|
pIter->pSttBlockReader->uid = 0;
|
||||||
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
|
tMergeTreeClose(&pIter->pSttBlockReader->mergeTree);
|
||||||
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
|
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
|
||||||
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
|
||||||
|
@ -1404,26 +1404,26 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
|
static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo,
|
||||||
SVersionRange* pVerRange) {
|
SVersionRange* pVerRange) {
|
||||||
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pSttBlockReader->order) ? 1 : -1;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
bool hasVal = tMergeTreeNext(&pSttBlockReader->mergeTree);
|
||||||
if (!hasVal) { // the next value will be the accessed key in stt
|
if (!hasVal) { // the next value will be the accessed key in stt
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_NO_DATA;
|
||||||
pScanInfo->sttKeyInfo.nextProcKey += step;
|
pScanInfo->sttKeyInfo.nextProcKey += step;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
|
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
|
||||||
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
|
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
|
||||||
|
|
||||||
pLastBlockReader->currentKey = key;
|
pSttBlockReader->currentKey = key;
|
||||||
pScanInfo->sttKeyInfo.nextProcKey = key;
|
pScanInfo->sttKeyInfo.nextProcKey = key;
|
||||||
|
|
||||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pLastBlockReader->order,
|
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->sttBlockDelIndex, key, ver, pSttBlockReader->order,
|
||||||
pVerRange)) {
|
pVerRange)) {
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
pScanInfo->sttKeyInfo.status = STT_FILE_HAS_DATA;
|
||||||
return true;
|
return true;
|
||||||
|
@ -1431,26 +1431,26 @@ static bool nextRowFromSttBlocks(SLastBlockReader* pLastBlockReader, STableBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) {
|
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) {
|
||||||
tMergeTreePinSttBlock(&pLastBlockReader->mergeTree);
|
tMergeTreePinSttBlock(&pSttBlockReader->mergeTree);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) {
|
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) {
|
||||||
tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree);
|
tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
|
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
|
||||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
|
||||||
bool* copied) {
|
bool* copied) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
*copied = false;
|
*copied = false;
|
||||||
|
|
||||||
// avoid the fetch next row replace the referenced stt block in buffer
|
// avoid the fetch next row replace the referenced stt block in buffer
|
||||||
doPinSttBlock(pLastBlockReader);
|
doPinSttBlock(pSttBlockReader);
|
||||||
bool hasVal = nextRowFromSttBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
|
bool hasVal = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
|
||||||
doUnpinSttBlock(pLastBlockReader);
|
doUnpinSttBlock(pSttBlockReader);
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if (next1 != ts) {
|
if (next1 != ts) {
|
||||||
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -1507,15 +1507,15 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
||||||
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
|
SIterInfo* pIter, int64_t key, SSttBlockReader* pSttBlockReader) {
|
||||||
SRowMerger* pMerger = &pReader->status.merger;
|
SRowMerger* pMerger = &pReader->status.merger;
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
if (hasDataInSttBlock(pSttBlockReader)) {
|
||||||
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
|
@ -1533,7 +1533,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
int64_t minKey = 0;
|
int64_t minKey = 0;
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
minKey = INT64_MAX; // chosen the minimum value
|
minKey = INT64_MAX; // chosen the minimum value
|
||||||
if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
|
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1546,7 +1546,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = INT64_MIN;
|
minKey = INT64_MIN;
|
||||||
if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
|
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1571,12 +1571,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1621,12 +1621,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* fRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1652,27 +1652,27 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
|
static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, STsdbReader* pReader,
|
||||||
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
||||||
bool mergeBlockData) {
|
bool mergeBlockData) {
|
||||||
SRowMerger* pMerger = &pReader->status.merger;
|
SRowMerger* pMerger = &pReader->status.merger;
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
bool copied = false;
|
bool copied = false;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
|
|
||||||
// create local variable to hold the row value
|
// create local variable to hold the row value
|
||||||
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
|
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
|
||||||
|
|
||||||
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid,
|
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", ts:%" PRId64 " %s", pRow->pBlockData, pRow->iRow, pSttBlockReader->uid,
|
||||||
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
|
fRow.pBlockData->aTSKEY[fRow.iRow], pReader->idStr);
|
||||||
|
|
||||||
// only stt block exists
|
// only stt block exists
|
||||||
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
||||||
code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
|
code = tryCopyDistinctRowFromSttBlock(&fRow, pSttBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1686,9 +1686,9 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||||
|
@ -1711,7 +1711,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
|
|
||||||
// merge with block data if ts == key
|
// merge with block data if ts == key
|
||||||
|
@ -1737,8 +1737,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
|
static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* pSttBlockReader, int64_t key,
|
||||||
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
SRowMerger* pMerger = &pReader->status.merger;
|
SRowMerger* pMerger = &pReader->status.merger;
|
||||||
|
|
||||||
|
@ -1753,24 +1753,24 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
|
|
||||||
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
if (hasDataInFileBlock(pBlockData, pDumpInfo)) {
|
||||||
// no last block available, only data block exists
|
// no last block available, only data block exists
|
||||||
if (!hasDataInLastBlock(pLastBlockReader)) {
|
if (!hasDataInSttBlock(pSttBlockReader)) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
// row in last file block
|
// row in last file block
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
||||||
if (key < tsLast) {
|
if (key < tsLast) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
} else if (key > tsLast) {
|
} else if (key > tsLast) {
|
||||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (key > tsLast) {
|
if (key > tsLast) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
} else if (key < tsLast) {
|
} else if (key < tsLast) {
|
||||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// the following for key == tsLast
|
// the following for key == tsLast
|
||||||
|
@ -1782,13 +1782,13 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
|
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
|
||||||
|
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
|
||||||
|
|
||||||
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1802,12 +1802,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
} else { // only last block exists
|
} else { // only last block exists
|
||||||
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
|
return doMergeFileBlockAndLastBlock(pSttBlockReader, pReader, pBlockScanInfo, NULL, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
|
||||||
SLastBlockReader* pLastBlockReader) {
|
SSttBlockReader* pSttBlockReader) {
|
||||||
SRowMerger* pMerger = &pReader->status.merger;
|
SRowMerger* pMerger = &pReader->status.merger;
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1818,8 +1818,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
if (hasDataInSttBlock(pSttBlockReader)) {
|
||||||
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
|
||||||
|
@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
|
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1884,7 +1884,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey < tsLast && hasDataInLastBlock(pLastBlockReader)) {
|
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1903,13 +1903,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1962,13 +1962,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey == tsLast) {
|
if (minKey == tsLast) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
code = tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
|
||||||
pReader->idStr);
|
pReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2087,10 +2087,10 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
static bool initLastBlockReader(SSttBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
|
||||||
// the last block reader has been initialized for this table.
|
// the last block reader has been initialized for this table.
|
||||||
if (pLBlockReader->uid == pScanInfo->uid) {
|
if (pLBlockReader->uid == pScanInfo->uid) {
|
||||||
return hasDataInLastBlock(pLBlockReader);
|
return hasDataInSttBlock(pLBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pLBlockReader->uid != 0) {
|
if (pLBlockReader->uid != 0) {
|
||||||
|
@ -2145,7 +2145,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
|
||||||
|
|
||||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
||||||
|
@ -2201,7 +2201,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
|
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo,
|
||||||
SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
|
SBlockData* pBlockData, SSttBlockReader* pSttBlockReader) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
TSDBROW *pRow = NULL, *piRow = NULL;
|
TSDBROW *pRow = NULL, *piRow = NULL;
|
||||||
|
@ -2218,21 +2218,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
||||||
|
|
||||||
// two levels of mem-table does contain the valid rows
|
// two levels of mem-table does contain the valid rows
|
||||||
if (pRow != NULL && piRow != NULL) {
|
if (pRow != NULL && piRow != NULL) {
|
||||||
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
// imem + file + last block
|
// imem + file + last block
|
||||||
if (pBlockScanInfo->iiter.hasVal) {
|
if (pBlockScanInfo->iiter.hasVal) {
|
||||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
|
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
// mem + file + last block
|
// mem + file + last block
|
||||||
if (pBlockScanInfo->iter.hasVal) {
|
if (pBlockScanInfo->iter.hasVal) {
|
||||||
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
|
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
// files data blocks + last block
|
// files data blocks + last block
|
||||||
return mergeFileBlockAndLastBlock(pReader, pLastBlockReader, key, pBlockScanInfo, pBlockData);
|
return mergeFileBlockAndSttBlock(pReader, pSttBlockReader, key, pBlockScanInfo, pBlockData);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
|
||||||
|
@ -2293,7 +2293,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
|
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||||
|
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -2342,7 +2342,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
SBlockData* pBlockData = &pReader->status.fileBlockData;
|
||||||
initLastBlockReader(pLastBlockReader, pBlockScanInfo, pReader);
|
initLastBlockReader(pSttBlockReader, pBlockScanInfo, pReader);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasBlockData = false;
|
bool hasBlockData = false;
|
||||||
|
@ -2376,7 +2376,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pSttBlockReader);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -2561,9 +2561,9 @@ static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pSt
|
||||||
return (pStatus->pTableIter != NULL);
|
return (pStatus->pTableIter != NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader;
|
||||||
STableUidList* pUidList = &pStatus->uidList;
|
STableUidList* pUidList = &pStatus->uidList;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -2601,7 +2601,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
bool hasDataInLastFile = initLastBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
if (!hasDataInLastFile) {
|
if (!hasDataInLastFile) {
|
||||||
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
bool hasNexTable = moveToNextTable(pUidList, pStatus);
|
||||||
if (!hasNexTable) {
|
if (!hasNexTable) {
|
||||||
|
@ -2613,14 +2613,14 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
while (1) {
|
while (1) {
|
||||||
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
|
bool hasBlockLData = hasDataInSttBlock(pSttBlockReader);
|
||||||
|
|
||||||
// no data in last block and block, no need to proceed.
|
// no data in last block and block, no need to proceed.
|
||||||
if (hasBlockLData == false) {
|
if (hasBlockLData == false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2667,7 +2667,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||||
STableBlockScanInfo* pScanInfo = NULL;
|
STableBlockScanInfo* pScanInfo = NULL;
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
|
||||||
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
|
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
|
||||||
|
@ -2685,7 +2685,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) {
|
||||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
initLastBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
|
||||||
|
@ -2730,13 +2730,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
// let's load data from stt files
|
// let's load data from stt files
|
||||||
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
initLastBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
|
||||||
// no data in last block, no need to proceed.
|
// no data in last block, no need to proceed.
|
||||||
while (hasDataInLastBlock(pLastBlockReader)) {
|
while (hasDataInSttBlock(pSttBlockReader)) {
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||||
|
|
||||||
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2746,7 +2746,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
// data in stt now overlaps with current active file data block, need to composed with file data block.
|
||||||
int64_t lastKeyInStt = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t lastKeyInStt = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) ||
|
if ((lastKeyInStt >= pBlockInfo->record.firstKey && asc) ||
|
||||||
(lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) {
|
(lastKeyInStt <= pBlockInfo->record.lastKey && (!asc))) {
|
||||||
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
tsdbDebug("%p lastKeyInStt:%" PRId64 ", overlap with file block, brange:%" PRId64 "-%" PRId64 " %s", pReader,
|
||||||
|
@ -2816,11 +2816,11 @@ _end:
|
||||||
|
|
||||||
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
|
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
|
||||||
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
|
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
|
||||||
#if 0
|
#if 0
|
||||||
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
|
||||||
pBlockLoadInfo = &pLastBlockReader->pInfo[i];
|
pBlockLoadInfo = &pSttBlockReader->pInfo[i];
|
||||||
|
|
||||||
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
|
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -3020,7 +3020,7 @@ typedef enum {
|
||||||
TSDB_READ_CONTINUE = 0x2,
|
TSDB_READ_CONTINUE = 0x2,
|
||||||
} ERetrieveType;
|
} ERetrieveType;
|
||||||
|
|
||||||
static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
|
static ERetrieveType doReadDataFromSttFiles(STsdbReader* pReader) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
@ -3030,7 +3030,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
|
||||||
while (1) {
|
while (1) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
code = doLoadLastBlockSequentially(pReader);
|
code = doLoadSttBlockSequentially(pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return TSDB_READ_RETURN;
|
return TSDB_READ_RETURN;
|
||||||
|
@ -3067,7 +3067,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
|
|
||||||
if (pBlockIter->numOfBlocks == 0) {
|
if (pBlockIter->numOfBlocks == 0) {
|
||||||
// let's try to extract data from stt files.
|
// let's try to extract data from stt files.
|
||||||
ERetrieveType type = doReadDataFromLastFiles(pReader);
|
ERetrieveType type = doReadDataFromSttFiles(pReader);
|
||||||
if (type == TSDB_READ_RETURN) {
|
if (type == TSDB_READ_RETURN) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -3102,7 +3102,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
resetDataBlockIterator(pBlockIter, pReader->info.order);
|
resetDataBlockIterator(pBlockIter, pReader->info.order);
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
|
|
||||||
ERetrieveType type = doReadDataFromLastFiles(pReader);
|
ERetrieveType type = doReadDataFromSttFiles(pReader);
|
||||||
if (type == TSDB_READ_RETURN) {
|
if (type == TSDB_READ_RETURN) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -3434,12 +3434,12 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||||
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
SRowMerger* pMerger, SVersionRange* pVerRange, const char* idStr) {
|
||||||
while (nextRowFromSttBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
|
while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pVerRange)) {
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
if (next1 == ts) {
|
if (next1 == ts) {
|
||||||
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
|
||||||
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
tsdbRowMergerAdd(pMerger, pRow1, NULL);
|
||||||
} else {
|
} else {
|
||||||
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
|
||||||
|
@ -4097,8 +4097,8 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
||||||
|
|
||||||
SReadCostSummary* pCost = &pReader->cost;
|
SReadCostSummary* pCost = &pReader->cost;
|
||||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||||
if (pFilesetIter->pLastBlockReader != NULL) {
|
if (pFilesetIter->pSttBlockReader != NULL) {
|
||||||
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
|
SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader;
|
||||||
tMergeTreeClose(&pLReader->mergeTree);
|
tMergeTreeClose(&pLReader->mergeTree);
|
||||||
taosMemoryFree(pLReader);
|
taosMemoryFree(pLReader);
|
||||||
}
|
}
|
||||||
|
@ -4976,7 +4976,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
|
||||||
void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
|
||||||
taosMemoryFreeClear(pReader->idStr);
|
taosMemoryFreeClear(pReader->idStr);
|
||||||
pReader->idStr = taosStrdup(idstr);
|
pReader->idStr = taosStrdup(idstr);
|
||||||
pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr;
|
pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
|
||||||
|
|
|
@ -145,21 +145,21 @@ typedef struct SBlockLoadSuppInfo {
|
||||||
bool smaValid; // the sma on all queried columns are activated
|
bool smaValid; // the sma on all queried columns are activated
|
||||||
} SBlockLoadSuppInfo;
|
} SBlockLoadSuppInfo;
|
||||||
|
|
||||||
typedef struct SLastBlockReader {
|
typedef struct SSttBlockReader {
|
||||||
STimeWindow window;
|
STimeWindow window;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
int32_t order;
|
int32_t order;
|
||||||
uint64_t uid;
|
uint64_t uid;
|
||||||
SMergeTree mergeTree;
|
SMergeTree mergeTree;
|
||||||
int64_t currentKey;
|
int64_t currentKey;
|
||||||
} SLastBlockReader;
|
} SSttBlockReader;
|
||||||
|
|
||||||
typedef struct SFilesetIter {
|
typedef struct SFilesetIter {
|
||||||
int32_t numOfFiles; // number of total files
|
int32_t numOfFiles; // number of total files
|
||||||
int32_t index; // current accessed index in the list
|
int32_t index; // current accessed index in the list
|
||||||
TFileSetArray* pFilesetList; // data file set list
|
TFileSetArray* pFilesetList; // data file set list
|
||||||
int32_t order;
|
int32_t order;
|
||||||
SLastBlockReader* pLastBlockReader; // last file block reader
|
SSttBlockReader* pSttBlockReader; // last file block reader
|
||||||
} SFilesetIter;
|
} SFilesetIter;
|
||||||
|
|
||||||
typedef struct SFileDataBlockInfo {
|
typedef struct SFileDataBlockInfo {
|
||||||
|
|
Loading…
Reference in New Issue