ehn(tsdb/cache): skip invalid row or data block directly with merge tree
This commit is contained in:
parent
b1949b5584
commit
a8d146c512
|
@ -600,6 +600,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
|||
int nCols) {
|
||||
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
bool checkRemainingRow = true;
|
||||
|
||||
switch (state->state) {
|
||||
case SFSLASTNEXTROW_FS:
|
||||
|
@ -632,6 +633,8 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
|||
if (code) goto _err;
|
||||
}
|
||||
|
||||
state->pLoadInfo->colIds = aCols;
|
||||
state->pLoadInfo->numOfCols = nCols;
|
||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
||||
|
@ -647,22 +650,71 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
|||
goto _next_fileset;
|
||||
}
|
||||
state->state = SFSLASTNEXTROW_BLOCKROW;
|
||||
checkRemainingRow = false;
|
||||
}
|
||||
case SFSLASTNEXTROW_BLOCKROW: {
|
||||
bool hasVal = false;
|
||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||
*ppRow = &state->row;
|
||||
hasVal = tMergeTreeNext(&state->mergeTree);
|
||||
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
||||
*pIgnoreEarlierTs = true;
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
}
|
||||
bool skipRow = false;
|
||||
do {
|
||||
bool hasVal = false;
|
||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||
*ppRow = &state->row;
|
||||
hasVal = tMergeTreeNext(&state->mergeTree);
|
||||
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
||||
*pIgnoreEarlierTs = true;
|
||||
*ppRow = NULL;
|
||||
return code;
|
||||
}
|
||||
|
||||
*pIgnoreEarlierTs = false;
|
||||
if (!hasVal) {
|
||||
state->state = SFSLASTNEXTROW_FILESET;
|
||||
}
|
||||
*pIgnoreEarlierTs = false;
|
||||
if (!hasVal) {
|
||||
state->state = SFSLASTNEXTROW_FILESET;
|
||||
goto _next_fileset;
|
||||
}
|
||||
|
||||
if (checkRemainingRow) {
|
||||
bool skipBlock = true;
|
||||
|
||||
SBlockData *pBlockData = state->row.pBlockData;
|
||||
|
||||
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
|
||||
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
||||
SColData *pColData = &pBlockData->aColData[colIndex];
|
||||
int16_t cid = pColData->cid;
|
||||
|
||||
if (cid == aCols[inputColIndex]) {
|
||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
||||
SColData *pColData = &pBlockData->aColData[colIndex];
|
||||
int16_t cid = pColData->cid;
|
||||
|
||||
if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
|
||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
}
|
||||
|
||||
++inputColIndex;
|
||||
}
|
||||
}
|
||||
*/
|
||||
if (skipBlock) {
|
||||
skipRow = true;
|
||||
}
|
||||
}
|
||||
} while (skipRow);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -908,10 +960,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
int16_t cid = pColData->cid;
|
||||
|
||||
if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
|
||||
if (isLast && pColData->numOfValue != 0) {
|
||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
} else if (pColData->numOfNone != pColData->nVal) {
|
||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue