enh(tsdb/cache): skip data block with merge tree
This commit is contained in:
parent
ea0cc2c584
commit
0b31aac2c5
|
@ -687,6 +687,8 @@ typedef struct SSttBlockLoadInfo {
|
||||||
STSchema *pSchema;
|
STSchema *pSchema;
|
||||||
int16_t *colIds;
|
int16_t *colIds;
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
|
bool checkRemainingRow;
|
||||||
|
bool isLast;
|
||||||
bool sttBlockLoaded;
|
bool sttBlockLoaded;
|
||||||
int32_t numOfStt;
|
int32_t numOfStt;
|
||||||
|
|
||||||
|
|
|
@ -590,6 +590,7 @@ typedef struct {
|
||||||
SDataFReader **pDataFReader;
|
SDataFReader **pDataFReader;
|
||||||
TSDBROW row;
|
TSDBROW row;
|
||||||
|
|
||||||
|
bool checkRemainingRow;
|
||||||
SMergeTree mergeTree;
|
SMergeTree mergeTree;
|
||||||
SMergeTree *pMergeTree;
|
SMergeTree *pMergeTree;
|
||||||
SSttBlockLoadInfo *pLoadInfo;
|
SSttBlockLoadInfo *pLoadInfo;
|
||||||
|
@ -600,7 +601,6 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
int nCols) {
|
int nCols) {
|
||||||
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool checkRemainingRow = true;
|
|
||||||
|
|
||||||
switch (state->state) {
|
switch (state->state) {
|
||||||
case SFSLASTNEXTROW_FS:
|
case SFSLASTNEXTROW_FS:
|
||||||
|
@ -633,8 +633,10 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
state->pLoadInfo->colIds = aCols;
|
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
|
||||||
state->pLoadInfo->numOfCols = nCols;
|
state->pLoadInfo[i].colIds = aCols;
|
||||||
|
state->pLoadInfo[i].numOfCols = nCols;
|
||||||
|
}
|
||||||
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
|
||||||
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
|
||||||
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
|
||||||
|
@ -650,17 +652,18 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
goto _next_fileset;
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
state->state = SFSLASTNEXTROW_BLOCKROW;
|
state->state = SFSLASTNEXTROW_BLOCKROW;
|
||||||
checkRemainingRow = false;
|
|
||||||
}
|
}
|
||||||
case SFSLASTNEXTROW_BLOCKROW: {
|
case SFSLASTNEXTROW_BLOCKROW: {
|
||||||
bool skipRow = false;
|
|
||||||
do {
|
|
||||||
bool hasVal = false;
|
bool hasVal = false;
|
||||||
state->row = tMergeTreeGetRow(&state->mergeTree);
|
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||||
*ppRow = &state->row;
|
*ppRow = &state->row;
|
||||||
if (nCols != state->pLoadInfo->numOfCols) {
|
if (nCols != state->pLoadInfo->numOfCols) {
|
||||||
state->pLoadInfo->numOfCols = nCols;
|
for (int i = 0; i < state->pLoadInfo->numOfStt; ++i) {
|
||||||
|
state->pLoadInfo[i].numOfCols = nCols;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
state->pLoadInfo->checkRemainingRow = state->checkRemainingRow;
|
||||||
|
state->pLoadInfo->isLast = isLast;
|
||||||
hasVal = tMergeTreeNext(&state->mergeTree);
|
hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
if (TSDBROW_TS(&state->row) <= state->lastTs) {
|
||||||
*pIgnoreEarlierTs = true;
|
*pIgnoreEarlierTs = true;
|
||||||
|
@ -671,54 +674,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
*pIgnoreEarlierTs = false;
|
*pIgnoreEarlierTs = false;
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkRemainingRow) {
|
if (!state->checkRemainingRow) {
|
||||||
bool skipBlock = true;
|
state->checkRemainingRow = true;
|
||||||
|
|
||||||
SBlockData *pBlockData = state->row.pBlockData;
|
|
||||||
|
|
||||||
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
|
|
||||||
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
|
||||||
SColData *pColData = &pBlockData->aColData[colIndex];
|
|
||||||
int16_t cid = pColData->cid;
|
|
||||||
|
|
||||||
if (cid == aCols[inputColIndex]) {
|
|
||||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
|
||||||
SColData *pColData = &pBlockData->aColData[colIndex];
|
|
||||||
int16_t cid = pColData->cid;
|
|
||||||
|
|
||||||
if (inputColIndex < nCols && cid == aCols[inputColIndex]) {
|
|
||||||
if (isLast && (pColData->flag & HAS_VALUE)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
|
||||||
skipBlock = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
++inputColIndex;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
if (skipBlock) {
|
|
||||||
skipRow = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} while (skipRow);
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -504,9 +504,34 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
pIter->iRow += step;
|
pIter->iRow += step;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
bool skipBlock = false;
|
||||||
|
|
||||||
findNextValidRow(pIter, idStr);
|
findNextValidRow(pIter, idStr);
|
||||||
|
|
||||||
if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
|
if (pIter->pBlockLoadInfo->checkRemainingRow) {
|
||||||
|
skipBlock = true;
|
||||||
|
int16_t *aCols = pIter->pBlockLoadInfo->colIds;
|
||||||
|
int nCols = pIter->pBlockLoadInfo->numOfCols;
|
||||||
|
bool isLast = pIter->pBlockLoadInfo->isLast;
|
||||||
|
for (int inputColIndex = 0; inputColIndex < nCols; ++inputColIndex) {
|
||||||
|
for (int colIndex = 0; colIndex < pBlockData->nColData; ++colIndex) {
|
||||||
|
SColData *pColData = &pBlockData->aColData[colIndex];
|
||||||
|
int16_t cid = pColData->cid;
|
||||||
|
|
||||||
|
if (cid == aCols[inputColIndex]) {
|
||||||
|
if (isLast && (pColData->flag & HAS_VALUE)) {
|
||||||
|
skipBlock = false;
|
||||||
|
break;
|
||||||
|
} else if (pColData->flag & (HAS_VALUE | HAS_NULL)) {
|
||||||
|
skipBlock = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
|
||||||
tLDataIterNextBlock(pIter, idStr);
|
tLDataIterNextBlock(pIter, idStr);
|
||||||
if (pIter->pSttBlk == NULL) { // no more data
|
if (pIter->pSttBlk == NULL) { // no more data
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
|
Loading…
Reference in New Issue