From 65e6c04a2dd714626ec5cd5d9fe2b19317a7af4c Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 4 Dec 2024 14:14:54 +0800 Subject: [PATCH] tsdb/cache: iterator for mem row --- source/dnode/vnode/src/tsdb/tsdbCache.c | 251 ++++++++++++++++-------- 1 file changed, 171 insertions(+), 80 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 83fd7b1d02..ade466cf08 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -757,7 +757,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) { int32_t code = 0; // 0, tsdbCacheUpdateFromIMem if updateCacheBatch - // flush dirty data of lru into rocks with + // flush dirty data of lru into rocks // 4, and update when writing if !updateCacheBatch // 5, merge cache & mem if updateCacheBatch @@ -2134,6 +2134,173 @@ _exit: TAOS_RETURN(code); } +typedef enum SMEMNEXTROWSTATES { + SMEMNEXTROW_ENTER, + SMEMNEXTROW_NEXT, +} SMEMNEXTROWSTATES; + +typedef struct SMemNextRowIter { + SMEMNEXTROWSTATES state; + STbData *pMem; // [input] + STbDataIter iter; // mem buffer skip list iterator + int64_t lastTs; +} SMemNextRowIter; + +static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols) { + SMemNextRowIter *state = (SMemNextRowIter *)iter; + int32_t code = 0; + *pIgnoreEarlierTs = false; + switch (state->state) { + case SMEMNEXTROW_ENTER: { + if (state->pMem != NULL) { + /* + if (state->pMem->maxKey <= state->lastTs) { + *ppRow = NULL; + *pIgnoreEarlierTs = true; + + TAOS_RETURN(code); + } + */ + tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); + + TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); + if (pMemRow) { + *ppRow = pMemRow; + state->state = SMEMNEXTROW_NEXT; + + TAOS_RETURN(code); + } + } + + *ppRow = NULL; + + TAOS_RETURN(code); + } + case SMEMNEXTROW_NEXT: + if (tsdbTbDataIterNext(&state->iter)) { + *ppRow = tsdbTbDataIterGet(&state->iter); + + TAOS_RETURN(code); + } else { + *ppRow = NULL; + + TAOS_RETURN(code); + } + default: + break; + } + +_err: + *ppRow = NULL; + + TAOS_RETURN(code); +} + +typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, + int nCols); +typedef int32_t (*_next_row_clear_fn_t)(void *iter); + +typedef struct { + TSDBROW *pRow; + bool stop; + bool next; + bool ignoreEarlierTs; + void *iter; + _next_row_fn_t nextRowFn; + _next_row_clear_fn_t nextRowClearFn; +} TsdbNextRowState; + +typedef struct { + SArray *pMemDelData; + SArray *pSkyline; + int64_t iSkyline; + SBlockIdx idx; + SMemNextRowIter memState; + SMemNextRowIter imemState; + TSDBROW memRow, imemRow; + TsdbNextRowState input[2]; + SCacheRowsReader *pr; + STsdb *pTsdb; +} MemNextRowIter; + +static int32_t MemRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, + SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) { + int32_t code = 0, lino = 0; + + STbData *pMem = NULL; + if (pReadSnap->pMem) { + pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid); + } + + STbData *pIMem = NULL; + if (pReadSnap->pIMem) { + pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid); + } + + pIter->pTsdb = pTsdb; + + pIter->pMemDelData = NULL; + + TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit); + + pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; + + pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL}; + pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL}; + + if (pMem) { + pIter->memState.pMem = pMem; + pIter->memState.state = SMEMNEXTROW_ENTER; + pIter->input[0].stop = false; + pIter->input[0].next = true; + } + + if (pIMem) { + pIter->imemState.pMem = pIMem; + pIter->imemState.state = SMEMNEXTROW_ENTER; + pIter->input[1].stop = false; + pIter->input[1].next = true; + } + + pIter->pr = pr; + +_exit: + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } + + TAOS_RETURN(code); +} + +static void memRowIterClose(MemNextRowIter *pIter) { + for (int i = 0; i < 3; ++i) { + if (pIter->input[i].nextRowClearFn) { + (void)pIter->input[i].nextRowClearFn(pIter->input[i].iter); + } + } + + if (pIter->pSkyline) { + taosArrayDestroy(pIter->pSkyline); + } + + if (pIter->pMemDelData) { + taosArrayDestroy(pIter->pMemDelData); + } +} + +static int32_t MemRowIterGet(MemNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, + int16_t *aCols, int nCols) { + int32_t code = 0, lino = 0; + +_exit: + if (code) { + tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); + } + + TAOS_RETURN(code); +} + static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) { int32_t code = 0; @@ -2160,7 +2327,9 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache TSDB_CHECK_CODE(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); - TSDB_CHECK_CODE(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); + if (tsUpdateCacheBatch) { + TSDB_CHECK_CODE(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); + } _exit: if (code) { @@ -3030,83 +3199,6 @@ _err: TAOS_RETURN(code); } -typedef enum SMEMNEXTROWSTATES { - SMEMNEXTROW_ENTER, - SMEMNEXTROW_NEXT, -} SMEMNEXTROWSTATES; - -typedef struct SMemNextRowIter { - SMEMNEXTROWSTATES state; - STbData *pMem; // [input] - STbDataIter iter; // mem buffer skip list iterator - int64_t lastTs; -} SMemNextRowIter; - -static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols) { - SMemNextRowIter *state = (SMemNextRowIter *)iter; - int32_t code = 0; - *pIgnoreEarlierTs = false; - switch (state->state) { - case SMEMNEXTROW_ENTER: { - if (state->pMem != NULL) { - /* - if (state->pMem->maxKey <= state->lastTs) { - *ppRow = NULL; - *pIgnoreEarlierTs = true; - - TAOS_RETURN(code); - } - */ - tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); - - TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); - if (pMemRow) { - *ppRow = pMemRow; - state->state = SMEMNEXTROW_NEXT; - - TAOS_RETURN(code); - } - } - - *ppRow = NULL; - - TAOS_RETURN(code); - } - case SMEMNEXTROW_NEXT: - if (tsdbTbDataIterNext(&state->iter)) { - *ppRow = tsdbTbDataIterGet(&state->iter); - - TAOS_RETURN(code); - } else { - *ppRow = NULL; - - TAOS_RETURN(code); - } - default: - break; - } - -_err: - *ppRow = NULL; - - TAOS_RETURN(code); -} - -typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, - int nCols); -typedef int32_t (*_next_row_clear_fn_t)(void *iter); - -typedef struct { - TSDBROW *pRow; - bool stop; - bool next; - bool ignoreEarlierTs; - void *iter; - _next_row_fn_t nextRowFn; - _next_row_clear_fn_t nextRowClearFn; -} TsdbNextRowState; - typedef struct CacheNextRowIter { SArray *pMemDelData; SArray *pSkyline; @@ -3385,7 +3477,6 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI } _err: - if (code) { tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); }