refactor(query): do some internal refactor.
This commit is contained in:
parent
c7350ef24d
commit
b2cf2818eb
|
@ -84,8 +84,6 @@ typedef struct SBlockLoadSuppInfo {
|
||||||
} SBlockLoadSuppInfo;
|
} SBlockLoadSuppInfo;
|
||||||
|
|
||||||
typedef struct SLastBlockReader {
|
typedef struct SLastBlockReader {
|
||||||
int32_t currentBlockIndex;
|
|
||||||
SBlockData lastBlockData;
|
|
||||||
STimeWindow window;
|
STimeWindow window;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
int32_t order;
|
int32_t order;
|
||||||
|
@ -345,11 +343,6 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet,
|
||||||
pLReader->order = pReader->order;
|
pLReader->order = pReader->order;
|
||||||
pLReader->window = pReader->window;
|
pLReader->window = pReader->window;
|
||||||
pLReader->verRange = pReader->verRange;
|
pLReader->verRange = pReader->verRange;
|
||||||
|
|
||||||
int32_t code = tBlockDataCreate(&pLReader->lastBlockData);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
|
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
|
||||||
|
@ -1269,15 +1262,13 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if ((pLastBlockReader->lastBlockData.nRow > 0) && hasDataInLastBlock(pLastBlockReader)) {
|
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||||
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBKEY k = TSDBROW_KEY(pRow);
|
TSDBKEY k = TSDBROW_KEY(pRow);
|
||||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||||
|
|
||||||
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
|
||||||
|
|
||||||
int64_t minKey = 0;
|
int64_t minKey = 0;
|
||||||
if (pReader->order == TSDB_ORDER_ASC) {
|
if (pReader->order == TSDB_ORDER_ASC) {
|
||||||
minKey = INT64_MAX; // chosen the minimum value
|
minKey = INT64_MAX; // chosen the minimum value
|
||||||
|
@ -1408,7 +1399,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
|
|
||||||
if (pBlockData->nRow > 0) {
|
if (pBlockData->nRow > 0) {
|
||||||
// no last block available, only data block exists
|
// no last block available, only data block exists
|
||||||
if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
|
if (!hasDataInLastBlock(pLastBlockReader)) {
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1458,7 +1449,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
ASSERT(pRow != NULL && piRow != NULL);
|
ASSERT(pRow != NULL && piRow != NULL);
|
||||||
|
|
||||||
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInLastBlock(pLastBlockReader)) {
|
if (hasDataInLastBlock(pLastBlockReader)) {
|
||||||
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
|
@ -1781,14 +1771,6 @@ static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid
|
||||||
}
|
}
|
||||||
|
|
||||||
pLastBlockReader->uid = uid;
|
pLastBlockReader->uid = uid;
|
||||||
if (*startPos == -1) {
|
|
||||||
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
|
|
||||||
// do nothing
|
|
||||||
} else {
|
|
||||||
*startPos = pLastBlockReader->lastBlockData.nRow;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*int32_t code = */ tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC),
|
/*int32_t code = */ tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC),
|
||||||
pFReader, uid, &pLastBlockReader->window, &pLastBlockReader->verRange);
|
pFReader, uid, &pLastBlockReader->window, &pLastBlockReader->verRange);
|
||||||
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree);
|
||||||
|
@ -2535,7 +2517,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
|
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
|
||||||
if (hasNext) { // check for the next block in the block accessed order list
|
if (hasNext) { // check for the next block in the block accessed order list
|
||||||
initBlockDumpInfo(pReader, pBlockIter);
|
initBlockDumpInfo(pReader, pBlockIter);
|
||||||
} else if (1/*taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0*/) {
|
} else if (hasDataInLastBlock(pReader->status.fileIter.pLastBlockReader)) {
|
||||||
// data blocks in current file are exhausted, let's try the next file now
|
// data blocks in current file are exhausted, let's try the next file now
|
||||||
tBlockDataReset(&pReader->status.fileBlockData);
|
tBlockDataReset(&pReader->status.fileBlockData);
|
||||||
resetDataBlockIterator(pBlockIter, pReader->order);
|
resetDataBlockIterator(pBlockIter, pReader->order);
|
||||||
|
@ -3316,6 +3298,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
|
taosMemoryFreeClear(pSupInfo->buildBuf[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pSupInfo->buildBuf);
|
taosMemoryFree(pSupInfo->buildBuf);
|
||||||
tBlockDataDestroy(&pReader->status.fileBlockData, true);
|
tBlockDataDestroy(&pReader->status.fileBlockData, true);
|
||||||
|
|
||||||
|
@ -3333,7 +3316,6 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
|
|
||||||
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
|
||||||
if (pFilesetIter->pLastBlockReader != NULL) {
|
if (pFilesetIter->pLastBlockReader != NULL) {
|
||||||
tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true);
|
|
||||||
taosMemoryFree(pFilesetIter->pLastBlockReader);
|
taosMemoryFree(pFilesetIter->pLastBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue