Merge pull request #20387 from taosdata/fix/main_bugfix_wxy

enh: optimize last/last_row cost when the cache was first loaded
This commit is contained in:
Shengliang Guan 2023-03-10 14:51:45 +08:00 committed by GitHub
commit cf1bf07bbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 63 additions and 23 deletions

View File

@ -593,9 +593,10 @@ typedef struct {
SMergeTree mergeTree; SMergeTree mergeTree;
SMergeTree *pMergeTree; SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo; SSttBlockLoadInfo *pLoadInfo;
int64_t lastTs;
} SFSLastNextRowIter; } SFSLastNextRowIter;
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter; SFSLastNextRowIter *state = (SFSLastNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
@ -641,15 +642,27 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
} }
state->state = SFSLASTNEXTROW_BLOCKROW; state->state = SFSLASTNEXTROW_BLOCKROW;
} }
case SFSLASTNEXTROW_BLOCKROW: case SFSLASTNEXTROW_BLOCKROW: {
bool hasVal = false;
do {
state->row = tMergeTreeGetRow(&state->mergeTree); state->row = tMergeTreeGetRow(&state->mergeTree);
*ppRow = &state->row; *ppRow = &state->row;
bool hasVal = tMergeTreeNext(&state->mergeTree); hasVal = tMergeTreeNext(&state->mergeTree);
} while (TSDBROW_TS(&state->row) <= state->lastTs && hasVal);
if (TSDBROW_TS(&state->row) <= state->lastTs) {
*pIgnoreEarlierTs = true;
state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset;
}
*pIgnoreEarlierTs = false;
if (!hasVal) { if (!hasVal) {
state->state = SFSLASTNEXTROW_FILESET; state->state = SFSLASTNEXTROW_FILESET;
} }
return code; return code;
}
default: default:
ASSERT(0); ASSERT(0);
break; break;
@ -725,7 +738,7 @@ typedef struct SFSNextRowIter {
int64_t lastTs; int64_t lastTs;
} SFSNextRowIter; } SFSNextRowIter;
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
SFSNextRowIter *state = (SFSNextRowIter *)iter; SFSNextRowIter *state = (SFSNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
@ -821,8 +834,10 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk); tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
if (block.maxKey.ts <= state->lastTs) { if (block.maxKey.ts <= state->lastTs) {
*pIgnoreEarlierTs = true;
goto _next_fileset; goto _next_fileset;
} }
*pIgnoreEarlierTs = false;
tBlockDataReset(state->pBlockData); tBlockDataReset(state->pBlockData);
TABLEID tid = {.suid = state->suid, .uid = state->uid}; TABLEID tid = {.suid = state->suid, .uid = state->uid};
code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0); code = tBlockDataInit(state->pBlockData, &tid, state->pTSchema, NULL, 0);
@ -932,16 +947,23 @@ typedef struct SMemNextRowIter {
SMEMNEXTROWSTATES state; SMEMNEXTROWSTATES state;
STbData *pMem; // [input] STbData *pMem; // [input]
STbDataIter iter; // mem buffer skip list iterator STbDataIter iter; // mem buffer skip list iterator
int64_t lastTs;
// bool iterOpened; // bool iterOpened;
// TSDBROW *curRow; // TSDBROW *curRow;
} SMemNextRowIter; } SMemNextRowIter;
static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow) { static int32_t getNextRowFromMem(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
SMemNextRowIter *state = (SMemNextRowIter *)iter; SMemNextRowIter *state = (SMemNextRowIter *)iter;
int32_t code = 0; int32_t code = 0;
*pIgnoreEarlierTs = false;
switch (state->state) { switch (state->state) {
case SMEMNEXTROW_ENTER: { case SMEMNEXTROW_ENTER: {
if (state->pMem != NULL) { if (state->pMem != NULL) {
if (state->pMem->maxKey <= state->lastTs) {
*ppRow = NULL;
*pIgnoreEarlierTs = true;
return code;
}
tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter); tsdbTbDataIterOpen(state->pMem, NULL, 1, &state->iter);
TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter); TSDBROW *pMemRow = tsdbTbDataIterGet(&state->iter);
@ -1042,13 +1064,14 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) {
return deleted; return deleted;
} }
typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow); typedef int32_t (*_next_row_fn_t)(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs);
typedef int32_t (*_next_row_clear_fn_t)(void *iter); typedef int32_t (*_next_row_clear_fn_t)(void *iter);
typedef struct { typedef struct {
TSDBROW *pRow; TSDBROW *pRow;
bool stop; bool stop;
bool next; bool next;
bool ignoreEarlierTs;
void *iter; void *iter;
_next_row_fn_t nextRowFn; _next_row_fn_t nextRowFn;
_next_row_clear_fn_t nextRowClearFn; _next_row_clear_fn_t nextRowClearFn;
@ -1132,6 +1155,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.uid = uid; pIter->fsLastState.uid = uid;
pIter->fsLastState.pLoadInfo = pLoadInfo; pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReaderLast; pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsLastState.lastTs = lastTs;
pIter->fsState.state = SFSNEXTROW_FS; pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb; pIter->fsState.pTsdb = pTsdb;
@ -1144,16 +1168,17 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsState.pDataFReader = pDataFReader; pIter->fsState.pDataFReader = pDataFReader;
pIter->fsState.lastTs = lastTs; pIter->fsState.lastTs = lastTs;
pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, &pIter->memState, getNextRowFromMem, NULL}; pIter->input[0] = (TsdbNextRowState){&pIter->memRow, true, false, false, &pIter->memState, getNextRowFromMem, NULL};
pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, &pIter->imemState, getNextRowFromMem, NULL}; pIter->input[1] = (TsdbNextRowState){&pIter->imemRow, true, false, false, &pIter->imemState, getNextRowFromMem, NULL};
pIter->input[2] = (TsdbNextRowState){&pIter->fsLastRow, false, true, &pIter->fsLastState, getNextRowFromFSLast, pIter->input[2] = (TsdbNextRowState){
clearNextRowFromFSLast}; &pIter->fsLastRow, false, true, false, &pIter->fsLastState, getNextRowFromFSLast, clearNextRowFromFSLast};
pIter->input[3] = pIter->input[3] =
(TsdbNextRowState){&pIter->fsRow, false, true, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS}; (TsdbNextRowState){&pIter->fsRow, false, true, false, &pIter->fsState, getNextRowFromFS, clearNextRowFromFS};
if (pMem) { if (pMem) {
pIter->memState.pMem = pMem; pIter->memState.pMem = pMem;
pIter->memState.state = SMEMNEXTROW_ENTER; pIter->memState.state = SMEMNEXTROW_ENTER;
pIter->memState.lastTs = lastTs;
pIter->input[0].stop = false; pIter->input[0].stop = false;
pIter->input[0].next = true; pIter->input[0].next = true;
} }
@ -1161,6 +1186,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
if (pIMem) { if (pIMem) {
pIter->imemState.pMem = pIMem; pIter->imemState.pMem = pIMem;
pIter->imemState.state = SMEMNEXTROW_ENTER; pIter->imemState.state = SMEMNEXTROW_ENTER;
pIter->imemState.lastTs = lastTs;
pIter->input[1].stop = false; pIter->input[1].stop = false;
pIter->input[1].next = true; pIter->input[1].next = true;
} }
@ -1188,12 +1214,12 @@ _err:
} }
// iterate next row non deleted backward ts, version (from high to low) // iterate next row non deleted backward ts, version (from high to low)
static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow, bool *pIgnoreEarlierTs) {
int code = 0; int code = 0;
for (;;) { for (;;) {
for (int i = 0; i < 4; ++i) { for (int i = 0; i < 4; ++i) {
if (pIter->input[i].next && !pIter->input[i].stop) { if (pIter->input[i].next && !pIter->input[i].stop) {
code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow); code = pIter->input[i].nextRowFn(pIter->input[i].iter, &pIter->input[i].pRow, &pIter->input[i].ignoreEarlierTs);
if (code) goto _err; if (code) goto _err;
if (pIter->input[i].pRow == NULL) { if (pIter->input[i].pRow == NULL) {
@ -1205,6 +1231,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) { if (pIter->input[0].stop && pIter->input[1].stop && pIter->input[2].stop && pIter->input[3].stop) {
*ppRow = NULL; *ppRow = NULL;
*pIgnoreEarlierTs = (pIter->input[0].ignoreEarlierTs || pIter->input[1].ignoreEarlierTs ||
pIter->input[2].ignoreEarlierTs || pIter->input[3].ignoreEarlierTs);
return code; return code;
} }
@ -1305,6 +1333,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
int16_t noneCol = 0; int16_t noneCol = 0;
bool setNoneCol = false; bool setNoneCol = false;
bool hasRow = false; bool hasRow = false;
bool ignoreEarlierTs = false;
SArray *pColArray = NULL; SArray *pColArray = NULL;
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
@ -1321,7 +1350,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow); nextRowIterGet(&iter, &pRow, &ignoreEarlierTs);
if (!pRow) { if (!pRow) {
break; break;
@ -1421,8 +1450,13 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
// taosArrayDestroy(pColArray); // taosArrayDestroy(pColArray);
//} else { //} else {
if (!hasRow) { if (!hasRow) {
if (ignoreEarlierTs) {
taosArrayDestroy(pColArray);
pColArray = NULL;
} else {
taosArrayClear(pColArray); taosArrayClear(pColArray);
} }
}
*ppColArray = pColArray; *ppColArray = pColArray;
//} //}
@ -1443,6 +1477,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
int16_t noneCol = 0; int16_t noneCol = 0;
bool setNoneCol = false; bool setNoneCol = false;
bool hasRow = false; bool hasRow = false;
bool ignoreEarlierTs = false;
SArray *pColArray = NULL; SArray *pColArray = NULL;
SColVal *pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
@ -1459,7 +1494,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
do { do {
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
nextRowIterGet(&iter, &pRow); nextRowIterGet(&iter, &pRow, &ignoreEarlierTs);
if (!pRow) { if (!pRow) {
break; break;
@ -1559,8 +1594,13 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
// taosArrayDestroy(pColArray); // taosArrayDestroy(pColArray);
//} else { //} else {
if (!hasRow) { if (!hasRow) {
if (ignoreEarlierTs) {
taosArrayDestroy(pColArray);
pColArray = NULL;
} else {
taosArrayClear(pColArray); taosArrayClear(pColArray);
} }
}
*ppLastArray = pColArray; *ppLastArray = pColArray;
//} //}
@ -1593,8 +1633,8 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
SArray *pArray = NULL; SArray *pArray = NULL;
bool dup = false; // which is always false for now bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr); code = mergeLastRow(uid, pTsdb, &dup, &pArray, pr);
// if table's empty or error, set handle NULL and return // if table's empty or error or ignore ignore earlier ts, set handle NULL and return
if (code < 0 /* || pArray == NULL*/) { if (code < 0 || pArray == NULL) {
if (!dup && pArray) { if (!dup && pArray) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
@ -1637,8 +1677,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
if (!h) { if (!h) {
SArray *pLastArray = NULL; SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray, pr); code = mergeLast(uid, pTsdb, &pLastArray, pr);
// if table's empty or error, set handle NULL and return // if table's empty or error or ignore ignore earlier ts, set handle NULL and return
if (code < 0 /* || pLastArray == NULL*/) { if (code < 0 || pLastArray == NULL) {
taosThreadMutexUnlock(&pTsdb->lruMutex); taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL; *handle = NULL;