fix(query): handle delete duration generating data block.
This commit is contained in:
parent
5c2e98544d
commit
c339800611
|
@ -137,7 +137,7 @@ static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList,
|
||||||
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
|
||||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||||
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
|
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
|
||||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey);
|
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
|
||||||
|
|
||||||
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
|
||||||
STsdbReader* pReader);
|
STsdbReader* pReader);
|
||||||
|
@ -1416,7 +1416,7 @@ static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVer
|
||||||
(pBlock->minVersion <= pVerRange->maxVer);
|
(pBlock->minVersion <= pVerRange->maxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
|
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
|
||||||
if (pBlockScanInfo->delSkyline == NULL) {
|
if (pBlockScanInfo->delSkyline == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1429,9 +1429,11 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
|
||||||
|
|
||||||
// version is not overlap
|
// version is not overlap
|
||||||
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
|
||||||
for(int32_t i = pBlockScanInfo->fileDelIndex; i < num; ++i) {
|
for(int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += step) {
|
||||||
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
|
||||||
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
|
if (p->ts >= pBlock->minKey.ts && p->ts <= pBlock->maxKey.ts) {
|
||||||
if (p->version >= pBlock->minVersion) {
|
if (p->version >= pBlock->minVersion) {
|
||||||
|
@ -1464,7 +1466,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
|
||||||
|
|
||||||
// has duplicated ts of different version in this block
|
// has duplicated ts of different version in this block
|
||||||
bool hasDup = (pBlock->nSubBlock == 1)? pBlock->hasDup:true;
|
bool hasDup = (pBlock->nSubBlock == 1)? pBlock->hasDup:true;
|
||||||
bool overlapWithDel= overlapWithDelSkyline(pScanInfo, pBlock);
|
bool overlapWithDel= overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
|
||||||
|
|
||||||
return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
|
return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
|
||||||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
|
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel);
|
||||||
|
@ -1691,7 +1693,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBKEY k = {.ts = ts, .version = ver};
|
TSDBKEY k = {.ts = ts, .version = ver};
|
||||||
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k)) {
|
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2220,41 +2222,104 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
|
||||||
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
|
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey) {
|
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order) {
|
||||||
ASSERT(pKey != NULL);
|
ASSERT(pKey != NULL);
|
||||||
if (pDelList == NULL) {
|
if (pDelList == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
size_t num = taosArrayGetSize(pDelList);
|
||||||
|
bool asc = ASCENDING_TRAVERSE(order);
|
||||||
|
int32_t step = asc? 1:-1;
|
||||||
|
|
||||||
if (*index >= taosArrayGetSize(pDelList) - 1) {
|
if (asc) {
|
||||||
TSDBKEY* last = taosArrayGetLast(pDelList);
|
if (*index >= num - 1) {
|
||||||
if (pKey->ts > last->ts) {
|
TSDBKEY* last = taosArrayGetLast(pDelList);
|
||||||
return false;
|
ASSERT(pKey->ts >= last->ts);
|
||||||
} else if (pKey->ts == last->ts) {
|
|
||||||
size_t size = taosArrayGetSize(pDelList);
|
if (pKey->ts > last->ts) {
|
||||||
TSDBKEY* prev = taosArrayGet(pDelList, size - 2);
|
|
||||||
if (prev->version >= pKey->version) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
return false;
|
||||||
|
} else if (pKey->ts == last->ts) {
|
||||||
|
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
|
||||||
|
return (prev->version >= pKey->version);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||||
|
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
|
||||||
|
|
||||||
|
if (pKey->ts < pCurrent->ts) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (pNext->ts <= pKey->ts && (*index) < num - 1) {
|
||||||
|
(*index) += 1;
|
||||||
|
|
||||||
|
if ((*index) < num - 1) {
|
||||||
|
pCurrent = taosArrayGet(pDelList, *index);
|
||||||
|
pNext = taosArrayGet(pDelList, (*index) + 1);
|
||||||
|
|
||||||
|
// it is not a consecutive deletion range, ignore it
|
||||||
|
if (pCurrent->version == 0 && pNext->version > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
if (*index <= 0) {
|
||||||
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
|
TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
|
||||||
|
|
||||||
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
|
if (pKey->ts < pFirst->ts) {
|
||||||
return true;
|
return false;
|
||||||
|
} else if (pKey->ts == pFirst->ts) {
|
||||||
|
return pFirst->version >= pKey->version;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
while (pNext->ts < pKey->ts && (*index) < taosArrayGetSize(pDelList) - 1) {
|
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||||
(*index) += 1;
|
TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);
|
||||||
|
|
||||||
|
if (pKey->ts > pCurrent->ts) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (pPrev->ts >= pKey->ts && (*index) > 1) {
|
||||||
|
(*index) += step;
|
||||||
|
|
||||||
|
if ((*index) >= 1) {
|
||||||
|
pCurrent = taosArrayGet(pDelList, *index);
|
||||||
|
pPrev = taosArrayGet(pDelList, (*index) - 1);
|
||||||
|
|
||||||
|
// it is not a consecutive deletion range, ignore it
|
||||||
|
if (pCurrent->version > 0 && pPrev->version == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
|
TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
|
||||||
|
@ -2271,7 +2336,7 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
|
||||||
|
|
||||||
// it is a valid data version
|
// it is a valid data version
|
||||||
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
|
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
|
||||||
(!hasBeenDropped(pDelList, &pIter->index, &key))) {
|
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2290,7 +2355,7 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
|
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
|
||||||
(!hasBeenDropped(pDelList, &pIter->index, &key))) {
|
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order))) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue