tsdb/cache: skip invalid datablock directly when traversing fs with last
This commit is contained in:
parent
7743518557
commit
f1414355e3
|
@ -596,7 +596,8 @@ typedef struct {
|
|||
int64_t lastTs;
|
||||
} SFSLastNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
|
||||
int nCols) {
|
||||
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -740,7 +741,8 @@ typedef struct SFSNextRowIter {
|
|||
int64_t lastTs;
|
||||
} SFSNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
|
||||
int nCols) {
|
||||
SFSNextRowIter *state = (SFSNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -828,8 +830,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
}
|
||||
}
|
||||
case SFSNEXTROW_BLOCKDATA:
|
||||
_next_datablock:
|
||||
if (state->iBlock >= 0) {
|
||||
SDataBlk block = {0};
|
||||
bool skipBlock = true;
|
||||
int inputColIndex = 0;
|
||||
|
||||
tDataBlkReset(&block);
|
||||
tBlockDataReset(state->pBlockData);
|
||||
|
@ -854,6 +859,42 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData);
|
||||
if (code) goto _err;
|
||||
|
||||
for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) {
|
||||
SColData *pColData = &state->pBlockData->aColData[colIndex];
|
||||
int16_t cid = pColData->cid;
|
||||
|
||||
if (inputColIndex < nCols && cid == aCols[inputColIndex++]) {
|
||||
if (isLast && pColData->numOfValue != 0) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
} else if (pColData->numOfNone != pColData->nVal) {
|
||||
skipBlock = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (skipBlock) {
|
||||
if (--state->iBlock < 0) {
|
||||
tsdbDataFReaderClose(state->pDataFReader);
|
||||
*state->pDataFReader = NULL;
|
||||
// resetLastBlockLoadInfo(state->pLoadInfo);
|
||||
|
||||
if (state->aBlockIdx) {
|
||||
// taosArrayDestroy(state->aBlockIdx);
|
||||
tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle);
|
||||
|
||||
state->aBlockIdxHandle = NULL;
|
||||
state->aBlockIdx = NULL;
|
||||
}
|
||||
|
||||
state->state = SFSNEXTROW_FILESET;
|
||||
goto _next_fileset;
|
||||
} else {
|
||||
goto _next_datablock;
|
||||
}
|
||||
}
|
||||
|
||||
state->nRow = state->blockData.nRow;
|
||||
state->iRow = state->nRow - 1;
|
||||
|
||||
|
@ -960,7 +1001,8 @@ typedef struct SMemNextRowIter {
|
|||
// TSDBROW *curRow;
|
||||
} SMemNextRowIter;
|
||||
|
||||
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
|
||||
int nCols) {
|
||||
SMemNextRowIter *state = (SMemNextRowIter *)iter;
|
||||
int32_t code = 0;
|
||||
*pIgnoreEarlierTs = false;
|
||||
|
@ -1072,7 +1114,8 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
|
|||
return deleted;
|
||||
}
|
||||
|
||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs);
|
||||
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
|
||||
int nCols);
|
||||
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
|
||||
|
||||
typedef struct {
|
||||
|
@ -1222,12 +1265,14 @@ _err:
|
|||
}
|
||||
|
||||
// iterate next row non deleted backward ts, version (from high to low)
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
|
||||
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
|
||||
int16_t *aCols, int nCols) {
|
||||
int code = 0;
|
||||
for (;;) {
|
||||
for (int i = 0; i < 4; ++i) {
|
||||
if (pIter->input[i].next && !pIter->input[i].stop) {
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs);
|
||||
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs,
|
||||
isLast, aCols, nCols);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pIter->input[i].pRow == NULL) {
|
||||
|
@ -1358,7 +1403,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
|
|||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs);
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, NULL, 0);
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
|
@ -1488,11 +1533,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
bool ignoreEarlierTs = false;
|
||||
SArray *pColArray = NULL;
|
||||
SColVal *pColVal = &(SColVal){0};
|
||||
int16_t nCols = nLastCol;
|
||||
|
||||
int32_t code = initLastColArray(pTSchema, &pColArray);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t));
|
||||
if (NULL == aColArray) {
|
||||
taosArrayDestroy(pColArray);
|
||||
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
|
@ -1502,7 +1554,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
|
||||
do {
|
||||
TSDBROW *pRow = NULL;
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs);
|
||||
nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray));
|
||||
|
||||
if (!pRow) {
|
||||
break;
|
||||
|
@ -1547,9 +1599,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
|
||||
}
|
||||
|
||||
if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
if (!COL_VAL_IS_VALUE(pColVal)) {
|
||||
taosArrayPush(aColArray, &pColVal->cid);
|
||||
if (!setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!setNoneCol) {
|
||||
|
@ -1590,6 +1645,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
}
|
||||
|
||||
taosArraySet(pColArray, iCol, &lastCol);
|
||||
int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ);
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
|
@ -1605,8 +1662,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
|
|||
if (ignoreEarlierTs) {
|
||||
taosArrayDestroy(pColArray);
|
||||
pColArray = NULL;
|
||||
taosArrayDestroy(aColArray);
|
||||
aColArray = NULL;
|
||||
} else {
|
||||
taosArrayClear(pColArray);
|
||||
taosArrayClear(aColArray);
|
||||
}
|
||||
}
|
||||
*ppLastArray = pColArray;
|
||||
|
@ -1621,6 +1681,7 @@ _err:
|
|||
// taosMemoryFreeClear(pTSchema);
|
||||
*ppLastArray = NULL;
|
||||
taosArrayDestroy(pColArray);
|
||||
taosArrayDestroy(aColArray);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue