tsdb/cache: merge fs & stt row

This commit is contained in:
Minglei Jin 2023-07-19 18:10:36 +08:00
parent 913cfe43e8
commit 4eaecd11de
1 changed files with 36 additions and 0 deletions

View File

@ -1919,6 +1919,8 @@ typedef struct SFSNextRowIter {
SFSLastIter lastIter; SFSLastIter lastIter;
SFSLastIter *pLastIter; SFSLastIter *pLastIter;
TSDBROW *pLastRow; TSDBROW *pLastRow;
SRow *pTSRow;
SRowMerger rowMerger;
SCacheRowsReader *pr; SCacheRowsReader *pr;
struct CacheNextRowIter *pRowIter; struct CacheNextRowIter *pRowIter;
} SFSNextRowIter; } SFSNextRowIter;
@ -1936,6 +1938,11 @@ static void clearLastFileSet(SFSNextRowIter *state) {
if (state->pFileReader) { if (state->pFileReader) {
tsdbDataFileReaderClose(&state->pFileReader); tsdbDataFileReaderClose(&state->pFileReader);
} }
if (state->pTSRow) {
taosMemoryFree(state->pTSRow);
state->pTSRow = NULL;
}
} }
static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols, static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlierTs, bool isLast, int16_t *aCols,
@ -2148,6 +2155,35 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
return code; return code;
} else { } else {
// TODO: merge rows and *ppRow = mergedRow // TODO: merge rows and *ppRow = mergedRow
SRowMerger *pMerger = &state->rowMerger;
tsdbRowMergerInit(pMerger, state->pTSchema);
code = tsdbRowMergerAdd(pMerger, &state->row, state->pTSchema);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
code = tsdbRowMergerAdd(pMerger, state->pLastRow, state->pTSchema);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
if (state->pTSRow) {
taosMemoryFree(state->pTSRow);
state->pTSRow = NULL;
}
code = tsdbRowMergerGetRow(pMerger, &state->pTSRow);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
state->row = tsdbRowFromTSRow(TSDBROW_VERSION(&state->row), state->pTSRow);
*ppRow = &state->row;
--state->iRow;
tsdbRowMergerClear(pMerger);
return code;
} }
} }