Merge pull request #16668 from taosdata/feature/3_liaohj
fix(query): fix the invalid the check of last file blocks.
This commit is contained in:
commit
f261fe7837
|
@ -650,7 +650,7 @@ typedef struct SMergeTree {
|
||||||
SLDataIter *pIter;
|
SLDataIter *pIter;
|
||||||
} SMergeTree;
|
} SMergeTree;
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange);
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
|
|
|
@ -420,6 +420,7 @@ typedef enum {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SFSLASTNEXTROWSTATES state; // [input]
|
SFSLASTNEXTROWSTATES state; // [input]
|
||||||
STsdb *pTsdb; // [input]
|
STsdb *pTsdb; // [input]
|
||||||
|
tb_uid_t suid;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
int32_t nFileSet;
|
int32_t nFileSet;
|
||||||
int32_t iFileSet;
|
int32_t iFileSet;
|
||||||
|
@ -454,7 +455,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX});
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX});
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
|
@ -796,7 +797,7 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
|
||||||
if (key->ts > pItemBack->ts) {
|
if (key->ts > pItemBack->ts) {
|
||||||
return false;
|
return false;
|
||||||
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
|
} else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) {
|
||||||
if ((key->version <= pItemFront->version || key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
|
if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) {
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
return false;
|
return false;
|
||||||
|
@ -890,6 +891,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
||||||
pIter->fsLastState.pTsdb = pTsdb;
|
pIter->fsLastState.pTsdb = pTsdb;
|
||||||
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
||||||
|
pIter->fsLastState.suid = suid;
|
||||||
pIter->fsLastState.uid = uid;
|
pIter->fsLastState.uid = uid;
|
||||||
|
|
||||||
pIter->fsState.state = SFSNEXTROW_FS;
|
pIter->fsState.state = SFSNEXTROW_FS;
|
||||||
|
|
|
@ -40,8 +40,8 @@ static SBlockData *getNextBlock(SLDataIter *pIter) {
|
||||||
return getCurrentBlock(pIter);
|
return getCurrentBlock(pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t uid,
|
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pRange) {
|
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
|
||||||
if (*pIter == NULL) {
|
if (*pIter == NULL) {
|
||||||
|
@ -50,11 +50,11 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pIter)->uid = uid;
|
(*pIter)->uid = uid;
|
||||||
(*pIter)->timeWindow = *pTimeWindow;
|
|
||||||
(*pIter)->verRange = *pRange;
|
|
||||||
(*pIter)->pReader = pReader;
|
(*pIter)->pReader = pReader;
|
||||||
(*pIter)->iStt = iStt;
|
(*pIter)->iStt = iStt;
|
||||||
(*pIter)->backward = backward;
|
(*pIter)->backward = backward;
|
||||||
|
(*pIter)->verRange = *pRange;
|
||||||
|
(*pIter)->timeWindow = *pTimeWindow;
|
||||||
(*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
(*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
|
||||||
if ((*pIter)->aSttBlk == NULL) {
|
if ((*pIter)->aSttBlk == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -83,6 +83,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
if (!backward) { // asc
|
if (!backward) { // asc
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
||||||
|
if (p->suid != suid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||||
index = i;
|
index = i;
|
||||||
break;
|
break;
|
||||||
|
@ -91,6 +95,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
} else { // desc
|
} else { // desc
|
||||||
for (int32_t i = size - 1; i >= 0; --i) {
|
for (int32_t i = size - 1; i >= 0; --i) {
|
||||||
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i);
|
||||||
|
if (p->suid != suid) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (p->minUid <= uid && p->maxUid >= uid) {
|
if (p->minUid <= uid && p->maxUid >= uid) {
|
||||||
index = i;
|
index = i;
|
||||||
break;
|
break;
|
||||||
|
@ -130,9 +138,31 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check uid firstly
|
||||||
if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
|
if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) {
|
||||||
index = i;
|
if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->backward && p->maxKey < pIter->timeWindow.skey) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check time range secondly
|
||||||
|
if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) {
|
||||||
|
if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIter->backward && p->maxVer < pIter->verRange.minVer) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) {
|
||||||
|
index = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,14 +222,6 @@ static void findNextValidRow(SLDataIter *pIter) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle delete soon
|
|
||||||
#if 0
|
|
||||||
TSDBKEY k = {.ts = ts, .version = ver};
|
|
||||||
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
hasVal = true;
|
hasVal = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -290,7 +312,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid,
|
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
|
STimeWindow *pTimeWindow, SVersionRange *pVerRange) {
|
||||||
pMTree->backward = backward;
|
pMTree->backward = backward;
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
|
@ -304,7 +326,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
|
|
||||||
struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0};
|
struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0};
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange);
|
code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,16 +226,13 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1;
|
||||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||||
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||||
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
|
info.lastKey = pTsdbReader->window.skey - step;
|
||||||
info.lastKey = pTsdbReader->window.skey;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey);
|
|
||||||
} else {
|
} else {
|
||||||
info.lastKey = pTsdbReader->window.skey;
|
info.lastKey = pTsdbReader->window.ekey - step;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
||||||
|
@ -249,7 +246,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
return pTableMap;
|
return pTableMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void resetDataBlockScanInfo(SHashObj* pTableMap) {
|
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
||||||
STableBlockScanInfo* p = NULL;
|
STableBlockScanInfo* p = NULL;
|
||||||
|
|
||||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||||
|
@ -260,6 +257,7 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
|
p->lastKey = ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1430,6 +1428,10 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
|
|
||||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||||
|
|
||||||
|
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
|
tRowMerge(&merge, &fRow1);
|
||||||
|
|
||||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
|
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
|
||||||
|
|
||||||
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
|
int32_t code = tRowMergerGetRow(&merge, &pTSRow);
|
||||||
|
@ -1862,61 +1864,6 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
*(pLastBlockReader->rowIndex) += step;
|
|
||||||
|
|
||||||
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
|
|
||||||
for (int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
|
|
||||||
if (pBlockData->aUid != NULL) {
|
|
||||||
if (asc) {
|
|
||||||
if (pBlockData->aUid[i] < pLastBlockReader->uid) {
|
|
||||||
continue;
|
|
||||||
} else if (pBlockData->aUid[i] > pLastBlockReader->uid) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (pBlockData->aUid[i] > pLastBlockReader->uid) {
|
|
||||||
continue;
|
|
||||||
} else if (pBlockData->aUid[i] < pLastBlockReader->uid) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t ts = pBlockData->aTSKEY[i];
|
|
||||||
if (ts < pLastBlockReader->window.skey) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t ver = pBlockData->aVersion[i];
|
|
||||||
if (ver < pLastBlockReader->verRange.minVer) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// no data any more, todo opt handle desc case
|
|
||||||
if (ts > pLastBlockReader->window.ekey) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo opt handle desc case
|
|
||||||
if (ver > pLastBlockReader->verRange.maxVer) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
TSDBKEY k = {.ts = ts, .version = ver};
|
|
||||||
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
*(pLastBlockReader->rowIndex) = i;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// set all data is consumed in last block
|
|
||||||
setAllRowsChecked(pLastBlockReader);
|
|
||||||
return false;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo,
|
||||||
|
@ -1932,9 +1879,18 @@ static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockS
|
||||||
|
|
||||||
initMemDataIterator(pBlockScanInfo, pReader);
|
initMemDataIterator(pBlockScanInfo, pReader);
|
||||||
pLastBlockReader->uid = pBlockScanInfo->uid;
|
pLastBlockReader->uid = pBlockScanInfo->uid;
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1;
|
||||||
|
STimeWindow w = pLastBlockReader->window;
|
||||||
|
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
|
||||||
|
w.skey = pBlockScanInfo->lastKey + step;
|
||||||
|
} else {
|
||||||
|
w.ekey = pBlockScanInfo->lastKey + step;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code =
|
int32_t code =
|
||||||
tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader,
|
||||||
pBlockScanInfo->uid, &pLastBlockReader->window, &pLastBlockReader->verRange);
|
pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2537,22 +2493,24 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
||||||
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
|
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
|
||||||
if (hasNext) { // check for the next block in the block accessed order list
|
if (hasNext) { // check for the next block in the block accessed order list
|
||||||
initBlockDumpInfo(pReader, pBlockIter);
|
initBlockDumpInfo(pReader, pBlockIter);
|
||||||
} else if (hasDataInLastBlock(pReader->status.fileIter.pLastBlockReader)) {
|
|
||||||
// data blocks in current file are exhausted, let's try the next file now
|
|
||||||
tBlockDataReset(&pReader->status.fileBlockData);
|
|
||||||
resetDataBlockIterator(pBlockIter, pReader->order);
|
|
||||||
goto _begin;
|
|
||||||
} else {
|
} else {
|
||||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
if (pReader->status.pCurrentFileset->nSttF > 0) {
|
||||||
|
// data blocks in current file are exhausted, let's try the next file now
|
||||||
// error happens or all the data files are completely checked
|
tBlockDataReset(&pReader->status.fileBlockData);
|
||||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
resetDataBlockIterator(pBlockIter, pReader->order);
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// this file does not have blocks, let's start check the last block file
|
|
||||||
if (pBlockIter->numOfBlocks == 0) {
|
|
||||||
goto _begin;
|
goto _begin;
|
||||||
|
} else {
|
||||||
|
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||||
|
|
||||||
|
// error happens or all the data files are completely checked
|
||||||
|
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this file does not have blocks, let's start check the last block file
|
||||||
|
if (pBlockIter->numOfBlocks == 0) {
|
||||||
|
goto _begin;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2898,6 +2856,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
||||||
|
|
||||||
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
|
||||||
SRowMerger* pMerger) {
|
SRowMerger* pMerger) {
|
||||||
|
pScanInfo->lastKey = ts;
|
||||||
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
|
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) {
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
if (next1 == ts) {
|
if (next1 == ts) {
|
||||||
|
@ -3589,11 +3548,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||||
|
|
||||||
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
|
||||||
|
|
||||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||||
resetDataBlockScanInfo(pReader->status.pTableMap);
|
|
||||||
|
int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1;
|
||||||
|
resetDataBlockScanInfo(pReader->status.pTableMap, ts);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
|
Loading…
Reference in New Issue