tsdb/cache: iterator for mem row

This commit is contained in:
Minglei Jin 2024-12-04 14:14:54 +08:00
parent a0509ab93d
commit 65e6c04a2d
1 changed files with 171 additions and 80 deletions

View File

@ -757,7 +757,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
// 0, tsdbCacheUpdateFromIMem if updateCacheBatch // 0, tsdbCacheUpdateFromIMem if updateCacheBatch
// flush dirty data of lru into rocks with // flush dirty data of lru into rocks
// 4, and update when writing if !updateCacheBatch // 4, and update when writing if !updateCacheBatch
// 5, merge cache & mem if updateCacheBatch // 5, merge cache & mem if updateCacheBatch
@ -2134,6 +2134,173 @@ _exit:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT,
} SMEMNEXTROWSTATES;
typedef struct SMemNextRowIter {
SMEMNEXTROWSTATES state;
STbData *pMem; // [input]
STbDataIter iter; // mem buffer skip list iterator
int64_t lastTs;
} SMemNextRowIter;
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SMemNextRowIter *state = (SMemNextRowIter *)iter;
int32_t code = 0;
*pIgnoreEarlierTs = false;
switch (state->state) {
case SMEMNEXTROW_ENTER: {
if (state->pMem != NULL) {
/*
if (state->pMem->maxKey <= state->lastTs) {
*ppRow = NULL;
*pIgnoreEarlierTs = true;
TAOS_RETURN(code);
}
*/
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
if (pMemRow) {
*ppRow = pMemRow;
state->state = SMEMNEXTROW_NEXT;
TAOS_RETURN(code);
}
}
*ppRow = NULL;
TAOS_RETURN(code);
}
case SMEMNEXTROW_NEXT:
if (tsdbTbDataIterNext(&state->iter)) {
*ppRow = tsdbTbDataIterGet(&state->iter);
TAOS_RETURN(code);
} else {
*ppRow = NULL;
TAOS_RETURN(code);
}
default:
break;
}
_err:
*ppRow = NULL;
TAOS_RETURN(code);
}
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols);
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
typedef struct {
TSDBROW *pRow;
bool stop;
bool next;
bool ignoreEarlierTs;
void *iter;
_next_row_fn_t nextRowFn;
_next_row_clear_fn_t nextRowClearFn;
} TsdbNextRowState;
typedef struct {
SArray *pMemDelData;
SArray *pSkyline;
int64_t iSkyline;
SBlockIdx idx;
SMemNextRowIter memState;
SMemNextRowIter imemState;
TSDBROW memRow, imemRow;
TsdbNextRowState input[2];
SCacheRowsReader *pr;
STsdb *pTsdb;
} MemNextRowIter;
static int32_t MemRowIterOpen(MemNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SArray *pLDataIterArray, STsdbReadSnap *pReadSnap, SCacheRowsReader *pr) {
int32_t code = 0, lino = 0;
STbData *pMem = NULL;
if (pReadSnap->pMem) {
pMem = tsdbGetTbDataFromMemTable(pReadSnap->pMem, suid, uid);
}
STbData *pIMem = NULL;
if (pReadSnap->pIMem) {
pIMem = tsdbGetTbDataFromMemTable(pReadSnap->pIMem, suid, uid);
}
pIter->pTsdb = pTsdb;
pIter->pMemDelData = NULL;
TAOS_CHECK_GOTO(loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer), &lino, _exit);
pIter->idx = (SBlockIdx){.suid = suid, .uid = uid};
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
if (pMem) {
pIter->memState.pMem = pMem;
pIter->memState.state = SMEMNEXTROW_ENTER;
pIter->input[0].stop = false;
pIter->input[0].next = true;
}
if (pIMem) {
pIter->imemState.pMem = pIMem;
pIter->imemState.state = SMEMNEXTROW_ENTER;
pIter->input[1].stop = false;
pIter->input[1].next = true;
}
pIter->pr = pr;
_exit:
if (code) {
tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
}
TAOS_RETURN(code);
}
static void memRowIterClose(MemNextRowIter *pIter) {
for (int i = 0; i < 3; ++i) {
if (pIter->input[i].nextRowClearFn) {
(void)pIter->input[i].nextRowClearFn(pIter->input[i].iter);
}
}
if (pIter->pSkyline) {
taosArrayDestroy(pIter->pSkyline);
}
if (pIter->pMemDelData) {
taosArrayDestroy(pIter->pMemDelData);
}
}
static int32_t MemRowIterGet(MemNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast,
int16_t *aCols, int nCols) {
int32_t code = 0, lino = 0;
_exit:
if (code) {
tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
}
TAOS_RETURN(code);
}
static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, static int32_t tsdbCacheGetBatchFromMem(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr,
int8_t ltype) { int8_t ltype) {
int32_t code = 0; int32_t code = 0;
@ -2160,7 +2327,9 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
TSDB_CHECK_CODE(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); TSDB_CHECK_CODE(tsdbCacheGetBatchFromLru(pTsdb, uid, pLastArray, pr, ltype), lino, _exit);
TSDB_CHECK_CODE(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, ltype), lino, _exit); if (tsUpdateCacheBatch) {
TSDB_CHECK_CODE(tsdbCacheGetBatchFromMem(pTsdb, uid, pLastArray, pr, ltype), lino, _exit);
}
_exit: _exit:
if (code) { if (code) {
@ -3030,83 +3199,6 @@ _err:
TAOS_RETURN(code); TAOS_RETURN(code);
} }
typedef enum SMEMNEXTROWSTATES {
SMEMNEXTROW_ENTER,
SMEMNEXTROW_NEXT,
} SMEMNEXTROWSTATES;
typedef struct SMemNextRowIter {
SMEMNEXTROWSTATES state;
STbData *pMem; // [input]
STbDataIter iter; // mem buffer skip list iterator
int64_t lastTs;
} SMemNextRowIter;
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols) {
SMemNextRowIter *state = (SMemNextRowIter *)iter;
int32_t code = 0;
*pIgnoreEarlierTs = false;
switch (state->state) {
case SMEMNEXTROW_ENTER: {
if (state->pMem != NULL) {
/*
if (state->pMem->maxKey <= state->lastTs) {
*ppRow = NULL;
*pIgnoreEarlierTs = true;
TAOS_RETURN(code);
}
*/
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
if (pMemRow) {
*ppRow = pMemRow;
state->state = SMEMNEXTROW_NEXT;
TAOS_RETURN(code);
}
}
*ppRow = NULL;
TAOS_RETURN(code);
}
case SMEMNEXTROW_NEXT:
if (tsdbTbDataIterNext(&state->iter)) {
*ppRow = tsdbTbDataIterGet(&state->iter);
TAOS_RETURN(code);
} else {
*ppRow = NULL;
TAOS_RETURN(code);
}
default:
break;
}
_err:
*ppRow = NULL;
TAOS_RETURN(code);
}
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
int nCols);
typedef int32_t (*_next_row_clear_fn_t)(void *iter);
typedef struct {
TSDBROW *pRow;
bool stop;
bool next;
bool ignoreEarlierTs;
void *iter;
_next_row_fn_t nextRowFn;
_next_row_clear_fn_t nextRowClearFn;
} TsdbNextRowState;
typedef struct CacheNextRowIter { typedef struct CacheNextRowIter {
SArray *pMemDelData; SArray *pMemDelData;
SArray *pSkyline; SArray *pSkyline;
@ -3385,7 +3477,6 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pI
} }
_err: _err:
if (code) { if (code) {
tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code)); tsdbError("tsdb/cache: %s failed at line %d since %s.", __func__, lino, tstrerror(code));
} }