feat: detect fs incompleteness of RSMA tsdbs

This commit is contained in:
Benguang Zhao 2023-09-27 15:22:24 +08:00
parent 29bbebc323
commit b80f8201ce
4 changed files with 24 additions and 9 deletions

View File

@ -1025,10 +1025,14 @@ struct STsdbFilterInfo {
TABLEID tbid;
};
enum {
typedef enum {
TSDB_FS_STATE_NORMAL = 0,
TSDB_FS_STATE_INCOMPLETE,
};
} ETsdbFsState;
// utils
ETsdbFsState tsdbSnapGetFsState(SVnode *pVnode);
int32_t tsdbSnapGetDetails(SVnode *pVnode, SSnapshot *pSnap);
#ifdef __cplusplus
}

View File

@ -295,7 +295,6 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
int32_t metaSnapWrite(SMetaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback);
// STsdbSnapReader ========================================
int32_t tsdbSnapGetDetails(SVnode* pVnode, SSnapshot* pSnap);
int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type, void* pRanges,
STsdbSnapReader** ppReader);
int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader);
@ -499,6 +498,7 @@ struct SSma {
#define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb)
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L0])
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb[TSDB_RETENTION_L1])
#define SMA_RSMA_GET_TSDB(pVnode, level) ((level == 0) ? pVnode->pTsdb : pVnode->pSma->pRSmaTsdb[level - 1])
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);

View File

@ -1507,14 +1507,20 @@ void tsdbSnapPartListDestroy(STsdbSnapPartList** ppList) {
ppList[0] = NULL;
}
ETsdbFsState tsdbSnapGetFsState(SVnode* pVnode) {
if (!VND_IS_RSMA(pVnode)) {
return pVnode->pTsdb->pFS->fsstate;
}
for (int32_t lvl = 0; lvl < TSDB_RETENTION_MAX; ++lvl) {
if (SMA_RSMA_GET_TSDB(pVnode, lvl)->pFS->fsstate != TSDB_FS_STATE_NORMAL) {
return TSDB_FS_STATE_INCOMPLETE;
}
}
return TSDB_FS_STATE_NORMAL;
}
int32_t tsdbSnapGetDetails(SVnode* pVnode, SSnapshot* pSnap) {
int code = -1;
if (pVnode->pTsdb->pFS->fsstate == TSDB_FS_STATE_NORMAL) {
pSnap->state = SYNC_FSM_STATE_NORMAL;
} else {
pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
}
STsdb* pTsdb = pVnode->pTsdb;
STsdbSnapPartList* pList = tsdbGetSnapPartList(pTsdb->pFS);
if (pList == NULL) goto _out;

View File

@ -791,6 +791,11 @@ int32_t vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnap) {
pSnap->lastApplyIndex = pVnode->state.committed;
pSnap->lastApplyTerm = pVnode->state.commitTerm;
pSnap->lastConfigIndex = -1;
pSnap->state = SYNC_FSM_STATE_NORMAL;
if (tsdbSnapGetFsState(pVnode) != TSDB_FS_STATE_NORMAL) {
pSnap->state = SYNC_FSM_STATE_INCOMPLETE;
}
if (pSnap->type == TDMT_SYNC_PREP_SNAPSHOT || pSnap->type == TDMT_SYNC_PREP_SNAPSHOT_REPLY) {
code = tsdbSnapGetDetails(pVnode, pSnap);