fix: continue to fetch next row if deleted
This commit is contained in:
parent
f3eb9c066a
commit
cb0a3ec042
|
@ -1093,72 +1093,71 @@ _err:
|
||||||
// iterate next row non deleted backward ts, version (from high to low)
|
// iterate next row non deleted backward ts, version (from high to low)
|
||||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
for (;;) {
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||||
|
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
for (int i = 0; i < 4; ++i) {
|
if (pIter->input[i].pRow == NULL) {
|
||||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
pIter->input[i].stop = true;
|
||||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow);
|
pIter->input[i].next = false;
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
if (pIter->input[i].pRow == NULL) {
|
|
||||||
pIter->input[i].stop = true;
|
|
||||||
pIter->input[i].next = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
|
|
||||||
*ppRow = NULL;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// select maxpoint(s) from mem, imem, fs and last
|
|
||||||
TSDBROW *max[4] = {0};
|
|
||||||
int iMax[4] = {-1, -1, -1, -1};
|
|
||||||
int nMax = 0;
|
|
||||||
TSKEY maxKey = TSKEY_MIN;
|
|
||||||
|
|
||||||
for (int i = 0; i < 4; ++i) {
|
|
||||||
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
|
||||||
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
|
||||||
|
|
||||||
// merging & deduplicating on client side
|
|
||||||
if (maxKey <= key.ts) {
|
|
||||||
if (maxKey < key.ts) {
|
|
||||||
nMax = 0;
|
|
||||||
maxKey = key.ts;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
iMax[nMax] = i;
|
|
||||||
max[nMax++] = pIter->input[i].pRow;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// delete detection
|
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
|
||||||
TSDBROW *merge[4] = {0};
|
*ppRow = NULL;
|
||||||
int iMerge[4] = {-1, -1, -1, -1};
|
return code;
|
||||||
int nMerge = 0;
|
|
||||||
for (int i = 0; i < nMax; ++i) {
|
|
||||||
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
|
|
||||||
|
|
||||||
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
|
|
||||||
if (!deleted) {
|
|
||||||
iMerge[nMerge] = iMax[i];
|
|
||||||
merge[nMerge++] = max[i];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pIter->input[iMax[i]].next = deleted;
|
// select maxpoint(s) from mem, imem, fs and last
|
||||||
|
TSDBROW *max[4] = {0};
|
||||||
|
int iMax[4] = {-1, -1, -1, -1};
|
||||||
|
int nMax = 0;
|
||||||
|
TSKEY maxKey = TSKEY_MIN;
|
||||||
|
|
||||||
|
for (int i = 0; i < 4; ++i) {
|
||||||
|
if (!pIter->input[i].stop && pIter->input[i].pRow != NULL) {
|
||||||
|
TSDBKEY key = TSDBROW_KEY(pIter->input[i].pRow);
|
||||||
|
|
||||||
|
// merging & deduplicating on client side
|
||||||
|
if (maxKey <= key.ts) {
|
||||||
|
if (maxKey < key.ts) {
|
||||||
|
nMax = 0;
|
||||||
|
maxKey = key.ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
iMax[nMax] = i;
|
||||||
|
max[nMax++] = pIter->input[i].pRow;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete detection
|
||||||
|
TSDBROW *merge[4] = {0};
|
||||||
|
int iMerge[4] = {-1, -1, -1, -1};
|
||||||
|
int nMerge = 0;
|
||||||
|
for (int i = 0; i < nMax; ++i) {
|
||||||
|
TSDBKEY maxKey1 = TSDBROW_KEY(max[i]);
|
||||||
|
|
||||||
|
bool deleted = tsdbKeyDeleted(&maxKey1, pIter->pSkyline, &pIter->iSkyline);
|
||||||
|
if (!deleted) {
|
||||||
|
iMerge[nMerge] = iMax[i];
|
||||||
|
merge[nMerge++] = max[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter->input[iMax[i]].next = deleted;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nMerge > 0) {
|
||||||
|
pIter->input[iMerge[0]].next = true;
|
||||||
|
|
||||||
|
*ppRow = merge[0];
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nMerge > 0) {
|
|
||||||
pIter->input[iMerge[0]].next = true;
|
|
||||||
|
|
||||||
*ppRow = merge[0];
|
|
||||||
} else {
|
|
||||||
*ppRow = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
_err:
|
_err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue