fix(tsdb/cache): skip remaining rows in current data block
This commit is contained in:
parent
6e0343fc08
commit
b1949b5584
|
@ -745,6 +745,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
int nCols) {
|
int nCols) {
|
||||||
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
bool checkRemainingRow = true;
|
||||||
|
|
||||||
switch (state->state) {
|
switch (state->state) {
|
||||||
case SFSNEXTROW_FS:
|
case SFSNEXTROW_FS:
|
||||||
|
@ -861,16 +862,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
|
|
||||||
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
|
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
|
||||||
SColData *pColData = &state->pBlockData->aColData[colIndex];
|
SColData *pColData = &state->pBlockData->aColData[colIndex];
|
||||||
int16_t cid = pColData->cid;
|
|
||||||
|
|
||||||
if (inputColIndex < nCols && cid == aCols[inputColIndex++]) {
|
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||||
if (isLast && pColData->numOfValue != 0) {
|
skipBlock = false;
|
||||||
skipBlock = false;
|
break;
|
||||||
break;
|
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||||
} else if (pColData->numOfNone != pColData->nVal) {
|
skipBlock = false;
|
||||||
skipBlock = false;
|
break;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -899,8 +897,51 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
state->iRow = state->nRow - 1;
|
state->iRow = state->nRow - 1;
|
||||||
|
|
||||||
state->state = SFSNEXTROW_BLOCKROW;
|
state->state = SFSNEXTROW_BLOCKROW;
|
||||||
|
checkRemainingRow = false;
|
||||||
}
|
}
|
||||||
case SFSNEXTROW_BLOCKROW:
|
case SFSNEXTROW_BLOCKROW: {
|
||||||
|
if (checkRemainingRow) {
|
||||||
|
bool skipBlock = true;
|
||||||
|
int inputColIndex = 0;
|
||||||
|
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
|
||||||
|
SColData *pColData = &state->pBlockData->aColData[colIndex];
|
||||||
|
int16_t cid = pColData->cid;
|
||||||
|
|
||||||
|
if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
|
||||||
|
if (isLast && pColData->numOfValue != 0) {
|
||||||
|
skipBlock = false;
|
||||||
|
break;
|
||||||
|
} else if (pColData->numOfNone != pColData->nVal) {
|
||||||
|
skipBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
++inputColIndex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipBlock) {
|
||||||
|
if (--state->iBlock < 0) {
|
||||||
|
tsdbDataFReaderClose(state->pDataFReader);
|
||||||
|
*state->pDataFReader = NULL;
|
||||||
|
// resetLastBlockLoadInfo(state->pLoadInfo);
|
||||||
|
|
||||||
|
if (state->aBlockIdx) {
|
||||||
|
// taosArrayDestroy(state->aBlockIdx);
|
||||||
|
tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
|
||||||
|
|
||||||
|
state->aBlockIdxHandle = NULL;
|
||||||
|
state->aBlockIdx = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
state->state = SFSNEXTROW_FILESET;
|
||||||
|
goto _next_fileset;
|
||||||
|
} else {
|
||||||
|
goto _next_datablock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (state->iRow >= 0) {
|
if (state->iRow >= 0) {
|
||||||
state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
|
state->row = tsdbRowFromBlockData(state->pBlockData, state->iRow);
|
||||||
*ppRow = &state->row;
|
*ppRow = &state->row;
|
||||||
|
@ -926,6 +967,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue