Merge pull request #13995 from taosdata/feature/3.0_mhli

refactor(sync): get snapshot and create reader
This commit is contained in:
Li Minghao 2022-06-20 11:30:06 +08:00 committed by GitHub
commit 8a15c2adb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 65 additions and 50 deletions

View File

@ -107,7 +107,9 @@ typedef struct SSyncFSM {
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
@ -193,8 +195,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);

View File

@ -73,7 +73,17 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
} }
} }
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
// TODO:
// atomic operation
// step1. sdbGetCommitInfo
// step2. create ppReader with pReaderParam
return 0;
}
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0; return 0;
@ -159,6 +169,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpReConfigCb = mndReConfig; pFsm->FpReConfigCb = mndReConfig;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
pFsm->FpSnapshotStartRead = mndSnapshotStartRead; pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
pFsm->FpSnapshotStopRead = mndSnapshotStopRead; pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
pFsm->FpSnapshotDoRead = mndSnapshotDoRead; pFsm->FpSnapshotDoRead = mndSnapshotDoRead;

View File

@ -353,8 +353,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0}; char logBuf[256] = {0};
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
(*pFsm->FpGetSnapshot)(pFsm, &snapshot); (*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -416,7 +416,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL; pFsm->FpRestoreFinishCb = NULL;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = vnodeSyncReconfig;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;

View File

@ -238,6 +238,9 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
void syncStartNormal(int64_t rid); void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid); void syncStartStandBy(int64_t rid);

View File

@ -497,7 +497,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do { do {
SyncIndex myLastIndex = syncNodeGetLastIndex(ths); SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths); syncNodeHasSnapshot(ths);
@ -710,7 +710,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first // advance commit index to sanpshot first
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex; SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;

View File

@ -123,7 +123,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) { if (gRaftDetailLog) {
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm); snapshot.lastApplyIndex, snapshot.lastApplyTerm);
} }
@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
bool hasSnapshot = syncNodeHasSnapshot(ths); bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
// start sending snapshot first time // start sending snapshot first time
// start here, stop by receiver // start here, stop by receiver
@ -209,7 +209,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) { if (gRaftDetailLog) {
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm); snapshot.lastApplyIndex, snapshot.lastApplyTerm);
} }

View File

@ -50,7 +50,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// advance commit index to sanpshot first // advance commit index to sanpshot first
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) { if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex; SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;

View File

@ -809,9 +809,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
// pSyncNode->pSnapshot = NULL; // pSyncNode->pSnapshot = NULL;
// if (pSyncNode->pFsm->FpGetSnapshot != NULL) { // if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
// pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); // pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
// pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); // pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot);
// } // }
// tsem_init(&(pSyncNode->restoreSem), 0, 0); // tsem_init(&(pSyncNode->restoreSem), 0, 0);
@ -1658,8 +1658,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
ret = true; ret = true;
} }
@ -1669,19 +1669,19 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(syncNodeHasSnapshot(pSyncNode)); ASSERT(syncNodeHasSnapshot(pSyncNode));
ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL);
ASSERT(index >= SYNC_INDEX_BEGIN); ASSERT(index >= SYNC_INDEX_BEGIN);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
bool b = (index <= snapshot.lastApplyIndex); bool b = (index <= snapshot.lastApplyIndex);
return b; return b;
} }
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -1694,8 +1694,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
if (syncNodeHasSnapshot(pSyncNode)) { if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot // has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
@ -1747,8 +1747,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if (syncNodeHasSnapshot(pSyncNode)) { if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot // has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
if (index > snapshot.lastApplyIndex + 1) { if (index > snapshot.lastApplyIndex + 1) {

View File

@ -124,7 +124,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
if (gRaftDetailLog) { if (gRaftDetailLog) {
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm); snapshot.lastApplyIndex, snapshot.lastApplyTerm);
} }

View File

@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->replicaIndex = replicaIndex; pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = taosGetTimestampMs() + 100; pSender->privateTerm = taosGetTimestampMs() + 100;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false; pSender->finish = false;
} else { } else {
sError("snapshotSenderCreate cannot create sender"); sError("snapshotSenderCreate cannot create sender");
@ -84,7 +84,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->blockLen = 0; pSender->blockLen = 0;
// get current snapshot info // get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld", sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
@ -558,7 +558,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
} }
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
do { do {
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish"); char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish");

View File

@ -36,9 +36,9 @@ void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -159,7 +159,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpSnapshotStartRead = SnapshotStartRead; pFsm->FpSnapshotStartRead = SnapshotStartRead;
pFsm->FpSnapshotStopRead = SnapshotStopRead; pFsm->FpSnapshotStopRead = SnapshotStopRead;

View File

@ -35,9 +35,9 @@ void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -90,7 +90,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpReConfigCb = ReConfigCb; pFsm->FpReConfigCb = ReConfigCb;

View File

@ -54,7 +54,7 @@ void init() {
pSyncNode->pWal = pWal; pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
void cleanup() { void cleanup() {

View File

@ -54,7 +54,7 @@ void init() {
pSyncNode->pWal = pWal; pSyncNode->pWal = pWal;
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
void cleanup() { void cleanup() {
@ -80,7 +80,7 @@ void test1() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -146,7 +146,7 @@ void test2() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -203,7 +203,7 @@ void test3() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -268,7 +268,7 @@ void test4() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
@ -335,7 +335,7 @@ void test5() {
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);

View File

@ -32,9 +32,9 @@ void cleanup() { walCleanUp(); }
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -75,7 +75,7 @@ SSyncFSM* createFsm() {
pFsm->FpCommitCb = CommitCb; pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
return pFsm; return pFsm;
} }

View File

@ -40,7 +40,7 @@ SSyncSnapshotSender* createSender() {
pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead; pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead;
pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead; pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead;
pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead; pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead;
pSyncNode->pFsm->FpGetSnapshot = GetSnapshot; pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot;
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2); SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2);
pSender->start = true; pSender->start = true;

View File

@ -35,9 +35,9 @@ const char *pWalDir = "./syncSnapshotTest_wal";
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot; SSnapshot snapshot;
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
@ -79,7 +79,7 @@ void initFsm() {
pFsm->FpCommitCb = CommitCb; pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
} }
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {

View File

@ -172,7 +172,7 @@ SSyncFSM* createFsm() {
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpReConfigCb = ReConfigCb; pFsm->FpReConfigCb = ReConfigCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshotInfo = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpSnapshotStartRead = SnapshotStartRead; pFsm->FpSnapshotStartRead = SnapshotStartRead;