tsdbCache: support multi-last data reading
This commit is contained in:
parent
39215ec102
commit
94f4d746b4
|
@ -418,31 +418,16 @@ typedef enum {
|
||||||
} SFSLASTNEXTROWSTATES;
|
} SFSLASTNEXTROWSTATES;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SFSLASTNEXTROWSTATES state; // [input]
|
SFSLASTNEXTROWSTATES state; // [input]
|
||||||
STsdb *pTsdb; // [input]
|
STsdb *pTsdb; // [input]
|
||||||
SBlockIdx *pBlockIdxExp; // [input]
|
|
||||||
STSchema *pTSchema; // [input]
|
|
||||||
tb_uid_t suid;
|
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
int32_t nFileSet;
|
int32_t nFileSet;
|
||||||
int32_t iFileSet;
|
int32_t iFileSet;
|
||||||
SArray *aDFileSet;
|
SArray *aDFileSet;
|
||||||
SDataFReader *pDataFReader;
|
SDataFReader *pDataFReader;
|
||||||
SArray *aBlockL;
|
|
||||||
SBlockL *pBlockL;
|
|
||||||
SBlockData *pBlockDataL;
|
|
||||||
SBlockData blockDataL;
|
|
||||||
int32_t nRow;
|
|
||||||
int32_t iRow;
|
|
||||||
TSDBROW row;
|
TSDBROW row;
|
||||||
/*
|
|
||||||
SArray *aBlockIdx;
|
SMergeTree mergeTree;
|
||||||
SBlockIdx *pBlockIdx;
|
|
||||||
SMapData blockMap;
|
|
||||||
int32_t nBlock;
|
|
||||||
int32_t iBlock;
|
|
||||||
SBlock block;
|
|
||||||
*/
|
|
||||||
} SFSLastNextRowIter;
|
} SFSLastNextRowIter;
|
||||||
|
|
||||||
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
|
@ -451,22 +436,16 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
|
|
||||||
switch (state->state) {
|
switch (state->state) {
|
||||||
case SFSLASTNEXTROW_FS:
|
case SFSLASTNEXTROW_FS:
|
||||||
// state->aDFileSet = state->pTsdb->pFS->cState->aDFileSet;
|
|
||||||
state->nFileSet = taosArrayGetSize(state->aDFileSet);
|
state->nFileSet = taosArrayGetSize(state->aDFileSet);
|
||||||
state->iFileSet = state->nFileSet;
|
state->iFileSet = state->nFileSet;
|
||||||
|
|
||||||
state->pBlockDataL = NULL;
|
|
||||||
|
|
||||||
case SFSLASTNEXTROW_FILESET: {
|
case SFSLASTNEXTROW_FILESET: {
|
||||||
SDFileSet *pFileSet = NULL;
|
SDFileSet *pFileSet = NULL;
|
||||||
_next_fileset:
|
_next_fileset:
|
||||||
if (--state->iFileSet >= 0) {
|
if (--state->iFileSet >= 0) {
|
||||||
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
|
pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet);
|
||||||
} else {
|
} else {
|
||||||
if (state->pBlockDataL) {
|
// tMergeTreeClose(&state->mergeTree);
|
||||||
tBlockDataDestroy(state->pBlockDataL, 1);
|
|
||||||
state->pBlockDataL = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppRow = NULL;
|
*ppRow = NULL;
|
||||||
return code;
|
return code;
|
||||||
|
@ -475,68 +454,24 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
|
||||||
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
if (!state->aBlockL) {
|
tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->uid,
|
||||||
state->aBlockL = taosArrayInit(0, sizeof(SBlockL));
|
&(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX},
|
||||||
} else {
|
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX});
|
||||||
taosArrayClear(state->aBlockL);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
}
|
if (!hasVal) {
|
||||||
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
code = tsdbReadBlockL(state->pDataFReader, 0, state->aBlockL);
|
// tMergeTreeClose(&state->mergeTree);
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
// SBlockL *pBlockL = (SBlockL *)taosArrayGet(state->aBlockL, state->iBlockL);
|
|
||||||
|
|
||||||
state->pBlockL = taosArraySearch(state->aBlockL, state->pBlockIdxExp, tCmprBlockL, TD_EQ);
|
|
||||||
if (!state->pBlockL) {
|
|
||||||
goto _next_fileset;
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t suid = state->pBlockL->suid;
|
|
||||||
int64_t uid = state->pBlockL->maxUid;
|
|
||||||
|
|
||||||
if (!state->pBlockDataL) {
|
|
||||||
state->pBlockDataL = &state->blockDataL;
|
|
||||||
|
|
||||||
tBlockDataCreate(state->pBlockDataL);
|
|
||||||
}
|
|
||||||
code = tBlockDataInit(state->pBlockDataL, suid, suid ? 0 : uid, state->pTSchema);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
case SFSLASTNEXTROW_BLOCKDATA:
|
|
||||||
code = tsdbReadLastBlock(state->pDataFReader, 0, state->pBlockL, state->pBlockDataL);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
state->nRow = state->blockDataL.nRow;
|
|
||||||
state->iRow = state->nRow - 1;
|
|
||||||
|
|
||||||
if (!state->pBlockDataL->uid) {
|
|
||||||
while (state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) {
|
|
||||||
--state->iRow;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
state->state = SFSLASTNEXTROW_BLOCKROW;
|
state->state = SFSLASTNEXTROW_BLOCKROW;
|
||||||
|
}
|
||||||
case SFSLASTNEXTROW_BLOCKROW:
|
case SFSLASTNEXTROW_BLOCKROW:
|
||||||
if (state->pBlockDataL->uid) {
|
state->row = tMergeTreeGetRow(&state->mergeTree);
|
||||||
if (state->iRow >= 0) {
|
*ppRow = &state->row;
|
||||||
state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
*ppRow = &state->row;
|
if (!hasVal) {
|
||||||
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
if (--state->iRow < 0) {
|
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (state->iRow >= 0 && state->pBlockIdxExp->uid == state->pBlockDataL->aUid[state->iRow]) {
|
|
||||||
state->row = tsdbRowFromBlockData(state->pBlockDataL, state->iRow);
|
|
||||||
*ppRow = &state->row;
|
|
||||||
|
|
||||||
if (--state->iRow < 0 || state->pBlockIdxExp->uid != state->pBlockDataL->aUid[state->iRow]) {
|
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
default:
|
default:
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -548,15 +483,6 @@ _err:
|
||||||
tsdbDataFReaderClose(&state->pDataFReader);
|
tsdbDataFReaderClose(&state->pDataFReader);
|
||||||
state->pDataFReader = NULL;
|
state->pDataFReader = NULL;
|
||||||
}
|
}
|
||||||
if (state->aBlockL) {
|
|
||||||
taosArrayDestroy(state->aBlockL);
|
|
||||||
state->aBlockL = NULL;
|
|
||||||
}
|
|
||||||
if (state->pBlockDataL) {
|
|
||||||
tBlockDataDestroy(state->pBlockDataL, 1);
|
|
||||||
state->pBlockDataL = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppRow = NULL;
|
*ppRow = NULL;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -574,14 +500,6 @@ int32_t clearNextRowFromFSLast(void *iter) {
|
||||||
tsdbDataFReaderClose(&state->pDataFReader);
|
tsdbDataFReaderClose(&state->pDataFReader);
|
||||||
state->pDataFReader = NULL;
|
state->pDataFReader = NULL;
|
||||||
}
|
}
|
||||||
if (state->aBlockL) {
|
|
||||||
taosArrayDestroy(state->aBlockL);
|
|
||||||
state->aBlockL = NULL;
|
|
||||||
}
|
|
||||||
if (state->pBlockDataL) {
|
|
||||||
tBlockDataDestroy(state->pBlockDataL, 1);
|
|
||||||
state->pBlockDataL = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -972,9 +890,6 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
|
||||||
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS;
|
||||||
pIter->fsLastState.pTsdb = pTsdb;
|
pIter->fsLastState.pTsdb = pTsdb;
|
||||||
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet;
|
||||||
pIter->fsLastState.pBlockIdxExp = &pIter->idx;
|
|
||||||
pIter->fsLastState.pTSchema = pTSchema;
|
|
||||||
pIter->fsLastState.suid = suid;
|
|
||||||
pIter->fsLastState.uid = uid;
|
pIter->fsLastState.uid = uid;
|
||||||
|
|
||||||
pIter->fsState.state = SFSNEXTROW_FS;
|
pIter->fsState.state = SFSNEXTROW_FS;
|
||||||
|
|
Loading…
Reference in New Issue