refact: set FpGetSnapshotInfo return type to void
This commit is contained in:
parent
13bee02129
commit
12b6cf15a0
|
@ -153,7 +153,7 @@ typedef struct SSyncFSM {
|
||||||
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
|
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
|
||||||
|
|
||||||
int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
||||||
int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
void (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||||
|
|
||||||
int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
|
int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
|
||||||
int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);
|
int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);
|
||||||
|
|
|
@ -142,10 +142,9 @@ int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pRe
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
|
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndRestoreFinish(const SSyncFSM *pFsm) {
|
void mndRestoreFinish(const SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -380,9 +380,8 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
static void vnodeSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
vnodeGetSnapshot(pFsm->data, pSnapshot);
|
vnodeGetSnapshot(pFsm->data, pSnapshot);
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
|
@ -539,7 +538,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
||||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||||
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||||
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
|
||||||
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
||||||
pFsm->FpLeaderTransferCb = NULL;
|
pFsm->FpLeaderTransferCb = NULL;
|
||||||
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
|
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
|
||||||
|
|
|
@ -815,11 +815,9 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
|
||||||
ASSERTS(pNode->pLogStore != NULL, "log store not created");
|
ASSERTS(pNode->pLogStore != NULL, "log store not created");
|
||||||
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
||||||
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {0};
|
||||||
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
|
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||||
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||||
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
|
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
|
||||||
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
|
@ -1029,11 +1027,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
|
||||||
SyncIndex commitIndex = SYNC_INDEX_INVALID;
|
SyncIndex commitIndex = SYNC_INDEX_INVALID;
|
||||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
SSnapshot snapshot = {0};
|
SSnapshot snapshot = {0};
|
||||||
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
if (code != 0) {
|
|
||||||
sError("vgId:%d, failed to get snapshot info, code:%d", pSyncNode->vgId, code);
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
if (snapshot.lastApplyIndex > commitIndex) {
|
if (snapshot.lastApplyIndex > commitIndex) {
|
||||||
commitIndex = snapshot.lastApplyIndex;
|
commitIndex = snapshot.lastApplyIndex;
|
||||||
sNTrace(pSyncNode, "reset commit index by snapshot");
|
sNTrace(pSyncNode, "reset commit index by snapshot");
|
||||||
|
@ -1155,9 +1149,8 @@ _error:
|
||||||
|
|
||||||
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
|
void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {0};
|
||||||
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
ASSERT(code == 0);
|
|
||||||
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
|
if (snapshot.lastApplyIndex > pSyncNode->commitIndex) {
|
||||||
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
pSyncNode->commitIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,8 +99,9 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
|
||||||
return prevLogTerm;
|
return prevLogTerm;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {0};
|
||||||
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) == 0 && prevIndex == snapshot.lastApplyIndex) {
|
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||||
|
if (prevIndex == snapshot.lastApplyIndex) {
|
||||||
return snapshot.lastApplyTerm;
|
return snapshot.lastApplyTerm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,11 +146,9 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
ASSERTS(pNode->pFsm != NULL, "pFsm not registered");
|
||||||
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered");
|
||||||
|
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot = {0};
|
||||||
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
|
pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
||||||
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||||
SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0);
|
SyncTerm commitTerm = TMAX(snapshot.lastApplyTerm, 0);
|
||||||
if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
|
if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) {
|
||||||
|
|
|
@ -47,7 +47,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
pSender->term = pSyncNode->pRaftStore->currentTerm;
|
pSender->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
pSender->startTime = 0;
|
pSender->startTime = 0;
|
||||||
pSender->endTime = 0;
|
pSender->endTime = 0;
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
|
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
|
||||||
|
|
Loading…
Reference in New Issue