Merge pull request #17684 from taosdata/fix/TD-19894
fix: continue to fetch next row if deleted
This commit is contained in:
commit
26aa0f1d37
|
@ -1093,72 +1093,71 @@ _err:
|
|||
// iterate next row non deleted backward ts, version (from high to low)
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
||||
int code = 0;
|
||||
for (;;) {
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pIter->input[i].pRow == NULL) {
|
||||
pIter->input[i].stop = true;
|
||||
pIter->input[i].next = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
// select maxpoint(s) from mem, imem, fs and last
|
||||
TSDBROW *max[4] = {0};
|
||||
int iMax[4] = {-1, -1, -1, -1};
|
||||
int nMax = 0;
|
||||
TSKEY maxKey = TSKEY_MIN;
|
||||
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
||||
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
||||
|
||||
// merging & deduplicating on client side
|
||||
if (maxKey <= key.ts) {
|
||||
if (maxKey < key.ts) {
|
||||
nMax = 0;
|
||||
maxKey = key.ts;
|
||||
if (pIter->input[i].pRow == NULL) {
|
||||
pIter->input[i].stop = true;
|
||||
pIter->input[i].next = false;
|
||||
}
|
||||
|
||||
iMax[nMax] = i;
|
||||
max[nMax++] = pIter->input[i].pRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete detection
|
||||
TSDBROW *merge[4] = {0};
|
||||
int iMerge[4] = {-1, -1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
|
||||
|
||||
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
|
||||
if (!deleted) {
|
||||
iMerge[nMerge] = iMax[i];
|
||||
merge[nMerge++] = max[i];
|
||||
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
pIter->input[iMax[i]].next = deleted;
|
||||
// select maxpoint(s) from mem, imem, fs and last
|
||||
TSDBROW *max[4] = {0};
|
||||
int iMax[4] = {-1, -1, -1, -1};
|
||||
int nMax = 0;
|
||||
TSKEY maxKey = TSKEY_MIN;
|
||||
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
||||
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
||||
|
||||
// merging & deduplicating on client side
|
||||
if (maxKey <= key.ts) {
|
||||
if (maxKey < key.ts) {
|
||||
nMax = 0;
|
||||
maxKey = key.ts;
|
||||
}
|
||||
|
||||
iMax[nMax] = i;
|
||||
max[nMax++] = pIter->input[i].pRow;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete detection
|
||||
TSDBROW *merge[4] = {0};
|
||||
int iMerge[4] = {-1, -1, -1, -1};
|
||||
int nMerge = 0;
|
||||
for (int i = 0; i < nMax; ++i) {
|
||||
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
|
||||
|
||||
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
|
||||
if (!deleted) {
|
||||
iMerge[nMerge] = iMax[i];
|
||||
merge[nMerge++] = max[i];
|
||||
}
|
||||
|
||||
pIter->input[iMax[i]].next = deleted;
|
||||
}
|
||||
|
||||
if (nMerge > 0) {
|
||||
pIter->input[iMerge[0]].next = true;
|
||||
|
||||
*ppRow = merge[0];
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
if (nMerge > 0) {
|
||||
pIter->input[iMerge[0]].next = true;
|
||||
|
||||
*ppRow = merge[0];
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
return code;
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue