fix(query): load del info with upper version limitation.
This commit is contained in:
parent
cca69f3038
commit
b5b4cd2a05
|
@ -297,7 +297,7 @@ int32_t tsdbUpdateDelFileHdr(SDelFWriter *pWriter);
|
|||
// SDelFReader
|
||||
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb);
|
||||
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer);
|
||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
|
||||
// tsdbRead.c ==============================================================================================
|
||||
int32_t tsdbTakeReadSnap(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
|
||||
|
|
|
@ -1454,7 +1454,7 @@ static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelI
|
|||
int32_t code = 0;
|
||||
|
||||
if (pDelIdx) {
|
||||
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData);
|
||||
code = tsdbReadDelData(pDelReader, pDelIdx, aDelData, INT64_MAX);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -266,7 +266,7 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDel
|
|||
suid = pDelIdx->suid;
|
||||
uid = pDelIdx->uid;
|
||||
|
||||
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
|
||||
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData, INT64_MAX);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
taosArrayClear(pCommitter->aDelData);
|
||||
|
|
|
@ -412,7 +412,7 @@ static int32_t tsdbTombFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo*
|
|||
}
|
||||
}
|
||||
|
||||
code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData);
|
||||
code = tsdbReadDelData(pIter->tIter.pReader, pDelIdx, pIter->tIter.aDelData, INT64_MAX);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pIter->delInfo.suid = pDelIdx->suid;
|
||||
|
|
|
@ -2967,7 +2967,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
SDelIdx* pIdx = taosArraySearch(pReader->pDelIdx, &idx, tCmprDelIdx, TD_EQ);
|
||||
|
||||
if (pIdx != NULL) {
|
||||
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
|
||||
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData, pReader->verRange.maxVer);
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
|
@ -2978,7 +2978,10 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
if (pMemTbData != NULL) {
|
||||
p = pMemTbData->pHead;
|
||||
while (p) {
|
||||
taosArrayPush(pDelData, p);
|
||||
if (p->version <= pReader->verRange.maxVer) {
|
||||
taosArrayPush(pDelData, p);
|
||||
}
|
||||
|
||||
p = p->pNext;
|
||||
}
|
||||
}
|
||||
|
@ -2986,7 +2989,9 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
|||
if (piMemTbData != NULL) {
|
||||
p = piMemTbData->pHead;
|
||||
while (p) {
|
||||
taosArrayPush(pDelData, p);
|
||||
if (p->version <= pReader->verRange.maxVer) {
|
||||
taosArrayPush(pDelData, p);
|
||||
}
|
||||
p = p->pNext;
|
||||
}
|
||||
}
|
||||
|
@ -4558,7 +4563,11 @@ int32_t tsdbReaderOpen(void* pVnode, SQueryTableDataCond* pCond, void* pTableLis
|
|||
|
||||
pReader->pIgnoreTables = pIgnoreTables;
|
||||
|
||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
||||
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
|
||||
" in this query %s",
|
||||
pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->verRange.minVer,
|
||||
pReader->verRange.maxVer, pReader->idStr);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
|
|
|
@ -1488,7 +1488,7 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
||||
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
|
||||
int32_t code = 0;
|
||||
int64_t offset = pDelIdx->offset;
|
||||
int64_t size = pDelIdx->size;
|
||||
|
@ -1510,11 +1510,15 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
|
|||
SDelData delData;
|
||||
n += tGetDelData(pReader->aBuf[0] + n, &delData);
|
||||
|
||||
if (taosArrayPush(aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
if (delData.version > maxVer) {
|
||||
continue;
|
||||
}
|
||||
if (taosArrayPush(aDelData, &delData) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n == size);
|
||||
|
||||
return code;
|
||||
|
|
|
@ -1166,7 +1166,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
|
|||
|
||||
int32_t c = tTABLEIDCmprFn(pDelIdx, &pWriter->tbid);
|
||||
if (c < 0) {
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData);
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->pTIter->tIter.aDelData, INT64_MAX);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
SDelIdx* pDelIdxNew = taosArrayReserve(pWriter->aDelIdx, 1);
|
||||
|
@ -1183,7 +1183,7 @@ static int32_t tsdbSnapWriteDelTableDataStart(STsdbSnapWriter* pWriter, TABLEID*
|
|||
|
||||
pWriter->pTIter->tIter.iDelIdx++;
|
||||
} else if (c == 0) {
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData);
|
||||
code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData, INT64_MAX);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
pWriter->pTIter->tIter.iDelIdx++;
|
||||
|
|
|
@ -333,7 +333,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver);
|
||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, type:%d, cursor advance", pReader->pWal->cfg.vgId, ver, pReader->pHead->head.msgType);
|
||||
pReader->curVersion = ver + 1;
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue