Merge pull request #22154 from taosdata/enh/tsdb_optimize
fix(tsdb/cache): new next stt row state for fs tranverse
This commit is contained in:
commit
5f25d70fc2
|
@ -45,7 +45,7 @@ typedef void (*TArray2Cb)(void *);
|
|||
#define TARRAY2_GET_PTR(a, i) ((a)->data + i)
|
||||
#define TARRAY2_FIRST(a) ((a)->data[0])
|
||||
#define TARRAY2_LAST(a) ((a)->data[(a)->size - 1])
|
||||
#define TARRAY2_DATA_LEN(a) ((a)->size * sizeof(typeof((a)->data[0])))
|
||||
#define TARRAY2_DATA_LEN(a) ((a)->size * sizeof(((a)->data[0])))
|
||||
|
||||
static FORCE_INLINE int32_t tarray2_make_room(void *arr, int32_t expSize, int32_t eleSize) {
|
||||
TARRAY2(void) *a = arr;
|
||||
|
@ -140,24 +140,23 @@ static FORCE_INLINE int32_t tarray2SortInsert(void *arr, const void *elePtr, int
|
|||
// return (TYPE *)
|
||||
#define TARRAY2_SEARCH(a, ep, cmp, flag) tarray2Search(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp, flag)
|
||||
|
||||
#define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) \
|
||||
tarray2SearchIdx(a, ep, sizeof(typeof((a)->data[0])), (__compar_fn_t)cmp, flag)
|
||||
#define TARRAY2_SEARCH_IDX(a, ep, cmp, flag) tarray2SearchIdx(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp, flag)
|
||||
|
||||
#define TARRAY2_SORT_INSERT(a, e, cmp) tarray2SortInsert(a, &(e), sizeof(((a)->data[0])), (__compar_fn_t)cmp)
|
||||
#define TARRAY2_SORT_INSERT_P(a, ep, cmp) tarray2SortInsert(a, ep, sizeof(((a)->data[0])), (__compar_fn_t)cmp)
|
||||
|
||||
#define TARRAY2_REMOVE(a, idx, cb) \
|
||||
do { \
|
||||
if ((idx) < (a)->size) { \
|
||||
if (cb) { \
|
||||
TArray2Cb cb_ = (TArray2Cb)(cb); \
|
||||
cb_((a)->data + (idx)); \
|
||||
} \
|
||||
if ((idx) < (a)->size - 1) { \
|
||||
memmove((a)->data + (idx), (a)->data + (idx) + 1, sizeof(typeof(*(a)->data)) * ((a)->size - (idx)-1)); \
|
||||
} \
|
||||
(a)->size--; \
|
||||
} \
|
||||
#define TARRAY2_REMOVE(a, idx, cb) \
|
||||
do { \
|
||||
if ((idx) < (a)->size) { \
|
||||
if (cb) { \
|
||||
TArray2Cb cb_ = (TArray2Cb)(cb); \
|
||||
cb_((a)->data + (idx)); \
|
||||
} \
|
||||
if ((idx) < (a)->size - 1) { \
|
||||
memmove((a)->data + (idx), (a)->data + (idx) + 1, sizeof((*(a)->data)) * ((a)->size - (idx)-1)); \
|
||||
} \
|
||||
(a)->size--; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define TARRAY2_FOREACH(a, e) for (int32_t __i = 0; __i < (a)->size && ((e) = (a)->data[__i], 1); __i++)
|
||||
|
|
|
@ -1884,7 +1884,8 @@ typedef enum SFSNEXTROWSTATES {
|
|||
SFSNEXTROW_BRINBLOCK,
|
||||
SFSNEXTROW_BRINRECORD,
|
||||
SFSNEXTROW_BLOCKDATA,
|
||||
SFSNEXTROW_BLOCKROW
|
||||
SFSNEXTROW_BLOCKROW,
|
||||
SFSNEXTROW_NEXTSTTROW
|
||||
} SFSNEXTROWSTATES;
|
||||
|
||||
struct CacheNextRowIter;
|
||||
|
@ -1913,6 +1914,7 @@ typedef struct SFSNextRowIter {
|
|||
int64_t lastTs;
|
||||
SFSLastIter lastIter;
|
||||
SFSLastIter *pLastIter;
|
||||
int8_t lastEmpty;
|
||||
TSDBROW *pLastRow;
|
||||
SRow *pTSRow;
|
||||
SRowMerger rowMerger;
|
||||
|
@ -1974,14 +1976,6 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
goto _err;
|
||||
}
|
||||
|
||||
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
|
||||
state->pr, state->lastTs, aCols, nCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
state->pLastIter = &state->lastIter;
|
||||
|
||||
loadDataTomb(state->pr, state->pr->pFileReader);
|
||||
|
||||
if (!state->pIndexList) {
|
||||
|
@ -2010,16 +2004,64 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
|
|||
|
||||
int indexSize = TARRAY_SIZE(state->pIndexList);
|
||||
if (indexSize <= 0) {
|
||||
// goto next fileset
|
||||
clearLastFileSet(state);
|
||||
state->state = SFSNEXTROW_FILESET;
|
||||
goto _next_fileset;
|
||||
}
|
||||
|
||||
state->state = SFSNEXTROW_INDEXLIST;
|
||||
state->iBrinIndex = indexSize;
|
||||
} else { // empty fileset, goto next fileset
|
||||
// clearLastFileSet(state);
|
||||
}
|
||||
|
||||
code = lastIterOpen(&state->lastIter, state->pFileSet, state->pTsdb, state->pTSchema, state->suid, state->uid,
|
||||
state->pr, state->lastTs, aCols, nCols);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = lastIterNext(&state->lastIter, &state->pLastRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!state->pLastRow) {
|
||||
state->lastEmpty = 1;
|
||||
|
||||
if (SFSNEXTROW_INDEXLIST != state->state) {
|
||||
clearLastFileSet(state);
|
||||
goto _next_fileset;
|
||||
}
|
||||
} else {
|
||||
state->lastEmpty = 0;
|
||||
|
||||
if (SFSNEXTROW_INDEXLIST != state->state) {
|
||||
state->state = SFSNEXTROW_NEXTSTTROW;
|
||||
|
||||
*ppRow = state->pLastRow;
|
||||
state->pLastRow = NULL;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
state->pLastIter = &state->lastIter;
|
||||
}
|
||||
|
||||
if (SFSNEXTROW_NEXTSTTROW == state->state) {
|
||||
code = lastIterNext(&state->lastIter, &state->pLastRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (!state->pLastRow) {
|
||||
lastIterClose(&state->pLastIter);
|
||||
|
||||
clearLastFileSet(state);
|
||||
state->state = SFSNEXTROW_FILESET;
|
||||
goto _next_fileset;
|
||||
} else {
|
||||
*ppRow = state->pLastRow;
|
||||
state->pLastRow = NULL;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue