refactor: do some internal refactor.
This commit is contained in:
parent
81e54541de
commit
0b6e531dd3
|
@ -221,7 +221,7 @@ static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader,
|
||||||
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
|
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
|
||||||
int32_t rowIndex);
|
int32_t rowIndex);
|
||||||
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
|
||||||
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
|
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
|
||||||
SVersionRange* pVerRange);
|
SVersionRange* pVerRange);
|
||||||
|
|
||||||
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
|
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
|
||||||
|
@ -1871,11 +1871,13 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
TSDBKEY k = {.version = pRow->version, .ts = pRow->pBlockData->aTSKEY[pRow->iRow]};
|
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
|
||||||
pLastBlockReader->currentKey = k.ts;
|
int64_t ver = pRow->version;
|
||||||
pScanInfo->lastKeyInStt = k.ts;
|
|
||||||
|
|
||||||
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
|
pLastBlockReader->currentKey = key;
|
||||||
|
pScanInfo->lastKeyInStt = ver;
|
||||||
|
|
||||||
|
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
|
||||||
// the qualifed ts may equal to k.ts, only a greater version one.
|
// the qualifed ts may equal to k.ts, only a greater version one.
|
||||||
// here we need to fallback one step.
|
// here we need to fallback one step.
|
||||||
return true;
|
return true;
|
||||||
|
@ -2543,8 +2545,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDBKEY k = {.ts = ts, .version = ver};
|
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->order,
|
||||||
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
|
|
||||||
&pReader->verRange)) {
|
&pReader->verRange)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2584,10 +2585,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
|
||||||
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
|
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
|
||||||
}
|
}
|
||||||
|
|
||||||
//static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
|
|
||||||
// return pLastBlockReader->currentKey;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
|
||||||
|
|
||||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
|
@ -3562,7 +3559,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
|
||||||
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
|
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
|
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, SVersionRange* pVerRange) {
|
||||||
if (pDelList == NULL) {
|
if (pDelList == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3574,29 +3571,29 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
|
||||||
if (asc) {
|
if (asc) {
|
||||||
if (*index >= num - 1) {
|
if (*index >= num - 1) {
|
||||||
TSDBKEY* last = taosArrayGetLast(pDelList);
|
TSDBKEY* last = taosArrayGetLast(pDelList);
|
||||||
ASSERT(pKey->ts >= last->ts);
|
ASSERT(key >= last->ts);
|
||||||
|
|
||||||
if (pKey->ts > last->ts) {
|
if (key > last->ts) {
|
||||||
return false;
|
return false;
|
||||||
} else if (pKey->ts == last->ts) {
|
} else if (key == last->ts) {
|
||||||
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
|
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
|
||||||
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
|
return (prev->version >= ver && prev->version <= pVerRange->maxVer &&
|
||||||
prev->version >= pVerRange->minVer);
|
prev->version >= pVerRange->minVer);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||||
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
|
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
|
||||||
|
|
||||||
if (pKey->ts < pCurrent->ts) {
|
if (key < pCurrent->ts) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
|
if (pCurrent->ts <= key && pNext->ts >= key && pCurrent->version >= ver &&
|
||||||
pVerRange->maxVer >= pCurrent->version) {
|
pVerRange->maxVer >= pCurrent->version) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pNext->ts <= pKey->ts && (*index) < num - 1) {
|
while (pNext->ts <= key && (*index) < num - 1) {
|
||||||
(*index) += 1;
|
(*index) += 1;
|
||||||
|
|
||||||
if ((*index) < num - 1) {
|
if ((*index) < num - 1) {
|
||||||
|
@ -3608,7 +3605,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
|
if (pCurrent->ts <= key && pNext->ts >= key && pCurrent->version >= ver &&
|
||||||
pVerRange->maxVer >= pCurrent->version) {
|
pVerRange->maxVer >= pCurrent->version) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -3621,10 +3618,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
|
||||||
if (*index <= 0) {
|
if (*index <= 0) {
|
||||||
TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
|
TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
|
||||||
|
|
||||||
if (pKey->ts < pFirst->ts) {
|
if (key < pFirst->ts) {
|
||||||
return false;
|
return false;
|
||||||
} else if (pKey->ts == pFirst->ts) {
|
} else if (key == pFirst->ts) {
|
||||||
return pFirst->version >= pKey->version;
|
return pFirst->version >= ver;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -3632,15 +3629,15 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
|
||||||
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
|
||||||
TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);
|
TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);
|
||||||
|
|
||||||
if (pKey->ts > pCurrent->ts) {
|
if (key > pCurrent->ts) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
|
if (pPrev->ts <= key && pCurrent->ts >= key && pPrev->version >= ver) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pPrev->ts >= pKey->ts && (*index) > 1) {
|
while (pPrev->ts >= key && (*index) > 1) {
|
||||||
(*index) += step;
|
(*index) += step;
|
||||||
|
|
||||||
if ((*index) >= 1) {
|
if ((*index) >= 1) {
|
||||||
|
@ -3652,7 +3649,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
|
if (pPrev->ts <= key && pCurrent->ts >= key && pPrev->version >= ver) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3680,7 +3677,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
|
||||||
|
|
||||||
// it is a valid data version
|
// it is a valid data version
|
||||||
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
|
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
|
||||||
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
|
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3699,7 +3696,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
|
||||||
}
|
}
|
||||||
|
|
||||||
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
|
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
|
||||||
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
|
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue