From f1414355e367c923a2894b773c121e4411b4af1d Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 15 Mar 2023 16:24:36 +0800 Subject: [PATCH] tsdb/cache: skip invalid datablock directly when traversing fs with last --- source/dnode/vnode/src/tsdb/tsdbCache.c | 83 +++++++++++++++++++++---- 1 file changed, 72 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ab94a5e1c7..556da75a12 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -596,7 +596,8 @@ typedef struct { int64_t lastTs; } SFSLastNextRowIter; -static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { +static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; int32_t code = 0; @@ -740,7 +741,8 @@ typedef struct SFSNextRowIter { int64_t lastTs; } SFSNextRowIter; -static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { +static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { SFSNextRowIter *state = (SFSNextRowIter *)iter; int32_t code = 0; @@ -828,8 +830,11 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie } } case SFSNEXTROW_BLOCKDATA: + _next_datablock: if (state->iBlock >= 0) { SDataBlk block = {0}; + bool skipBlock = true; + int inputColIndex = 0; tDataBlkReset(&block); tBlockDataReset(state->pBlockData); @@ -854,6 +859,42 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie code = tsdbReadDataBlock(*state->pDataFReader, &block, state->pBlockData); if (code) goto _err; + for (int colIndex = 0; colIndex < state->pBlockData->nColData; ++colIndex) { + SColData *pColData = &state->pBlockData->aColData[colIndex]; + int16_t cid = pColData->cid; + + if (inputColIndex < nCols && cid == aCols[inputColIndex++]) { + if (isLast && pColData->numOfValue != 0) { + skipBlock = false; + break; + } else if (pColData->numOfNone != pColData->nVal) { + skipBlock = false; + break; + } + } + } + + if (skipBlock) { + if (--state->iBlock < 0) { + tsdbDataFReaderClose(state->pDataFReader); + *state->pDataFReader = NULL; + // resetLastBlockLoadInfo(state->pLoadInfo); + + if (state->aBlockIdx) { + // taosArrayDestroy(state->aBlockIdx); + tsdbBICacheRelease(state->pTsdb->biCache, state->aBlockIdxHandle); + + state->aBlockIdxHandle = NULL; + state->aBlockIdx = NULL; + } + + state->state = SFSNEXTROW_FILESET; + goto _next_fileset; + } else { + goto _next_datablock; + } + } + state->nRow = state->blockData.nRow; state->iRow = state->nRow - 1; @@ -960,7 +1001,8 @@ typedef struct SMemNextRowIter { // TSDBROW *curRow; } SMemNextRowIter; -static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { +static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { SMemNextRowIter *state = (SMemNextRowIter *)iter; int32_t code = 0; *pIgnoreEarlierTs = false; @@ -1072,7 +1114,8 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { return deleted; } -typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs); +typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols); typedef int32_t (*_next_row_clear_fn_t)(void *iter); typedef struct { @@ -1222,12 +1265,14 @@ _err: } // iterate next row non deleted backward ts, version (from high to low) -static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) { +static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, + int16_t *aCols, int nCols) { int code = 0; for (;;) { for (int i = 0; i < 4; ++i) { if (pIter->input[i].next && !pIter->input[i].stop) { - code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs); + code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs, + isLast, aCols, nCols); if (code) goto _err; if (pIter->input[i].pRow == NULL) { @@ -1358,7 +1403,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo do { TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow, &ignoreEarlierTs); + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, false, NULL, 0); if (!pRow) { break; @@ -1488,11 +1533,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach bool ignoreEarlierTs = false; SArray *pColArray = NULL; SColVal *pColVal = &(SColVal){0}; + int16_t nCols = nLastCol; int32_t code = initLastColArray(pTSchema, &pColArray); if (TSDB_CODE_SUCCESS != code) { return code; } + SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); + if (NULL == aColArray) { + taosArrayDestroy(pColArray); + + return TSDB_CODE_OUT_OF_MEMORY; + } TSKEY lastRowTs = TSKEY_MAX; @@ -1502,7 +1554,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach do { TSDBROW *pRow = NULL; - nextRowIterGet(&iter, &pRow, &ignoreEarlierTs); + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); if (!pRow) { break; @@ -1547,9 +1599,12 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } - if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { - noneCol = iCol; - setNoneCol = true; + if (!COL_VAL_IS_VALUE(pColVal)) { + taosArrayPush(aColArray, &pColVal->cid); + if (!setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } } } if (!setNoneCol) { @@ -1590,6 +1645,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach } taosArraySet(pColArray, iCol, &lastCol); + int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); + taosArrayRemove(aColArray, aColIndex); } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { noneCol = iCol; setNoneCol = true; @@ -1605,8 +1662,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach if (ignoreEarlierTs) { taosArrayDestroy(pColArray); pColArray = NULL; + taosArrayDestroy(aColArray); + aColArray = NULL; } else { taosArrayClear(pColArray); + taosArrayClear(aColArray); } } *ppLastArray = pColArray; @@ -1621,6 +1681,7 @@ _err: // taosMemoryFreeClear(pTSchema); *ppLastArray = NULL; taosArrayDestroy(pColArray); + taosArrayDestroy(aColArray); return code; }