fix(query): reset the del file index when beginning last file check.
This commit is contained in:
parent
4892740cb1
commit
1741e98a4e
|
@ -215,6 +215,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||||
|
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||||
|
|
||||||
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
|
||||||
|
|
||||||
|
@ -2526,6 +2527,14 @@ _end:
|
||||||
|
|
||||||
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
|
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
|
||||||
|
|
||||||
|
int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
|
||||||
|
if (pDelSkyline == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
|
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData,
|
||||||
STbData* piMemTbData) {
|
STbData* piMemTbData) {
|
||||||
if (pBlockScanInfo->delSkyline != NULL) {
|
if (pBlockScanInfo->delSkyline != NULL) {
|
||||||
|
@ -2543,7 +2552,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
||||||
if (pIdx != NULL) {
|
if (pIdx != NULL) {
|
||||||
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
|
code = tsdbReadDelData(pReader->pDelFReader, pIdx, pDelData);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -2572,11 +2580,13 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pDelData);
|
taosArrayDestroy(pDelData);
|
||||||
pBlockScanInfo->iter.index =
|
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, pReader->order);
|
||||||
ASCENDING_TRAVERSE(pReader->order) ? 0 : taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
|
|
||||||
pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
|
pBlockScanInfo->iter.index = index;
|
||||||
pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
|
pBlockScanInfo->iiter.index = index;
|
||||||
pBlockScanInfo->lastBlockDelIndex = pBlockScanInfo->iter.index;
|
pBlockScanInfo->fileDelIndex = index;
|
||||||
|
pBlockScanInfo->lastBlockDelIndex = index;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -2676,13 +2686,17 @@ static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
|
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) {
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||||
|
|
||||||
void* p = taosHashIterate(pStatus->pTableMap, NULL);
|
void* p = taosHashIterate(pStatus->pTableMap, NULL);
|
||||||
while (p != NULL) {
|
while (p != NULL) {
|
||||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
|
||||||
|
|
||||||
|
// reset the last del file index
|
||||||
|
pScanInfo->lastBlockDelIndex = getInitialDelIndex(pScanInfo->delSkyline, order);
|
||||||
|
|
||||||
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
|
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
|
||||||
p = taosHashIterate(pStatus->pTableMap, p);
|
p = taosHashIterate(pStatus->pTableMap, p);
|
||||||
}
|
}
|
||||||
|
@ -2690,7 +2704,9 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
|
||||||
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
|
taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) {
|
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
|
||||||
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||||
if (total == 0) {
|
if (total == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2703,7 +2719,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
|
extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
|
||||||
uint64_t uid = pOrderCheckInfo->tableUidList[0];
|
uint64_t uid = pOrderCheckInfo->tableUidList[0];
|
||||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
} else {
|
} else {
|
||||||
|
@ -2720,7 +2736,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderSt
|
||||||
}
|
}
|
||||||
|
|
||||||
pOrderCheckInfo->tableUidList = p;
|
pOrderCheckInfo->tableUidList = p;
|
||||||
extractOrderedTableUidList(pOrderCheckInfo, pStatus);
|
extractOrderedTableUidList(pOrderCheckInfo, pStatus, pReader->order);
|
||||||
|
|
||||||
uid = pOrderCheckInfo->tableUidList[0];
|
uid = pOrderCheckInfo->tableUidList[0];
|
||||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
|
@ -2740,11 +2756,7 @@ static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus
|
||||||
|
|
||||||
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
|
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
|
||||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||||
if (pStatus->pTableIter == NULL) {
|
return (pStatus->pTableIter != NULL);
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
|
@ -2752,7 +2764,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
||||||
|
|
||||||
SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
|
SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
|
||||||
int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pStatus);
|
int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
|
if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue