refactor(sync): get snapshot and create reader
This commit is contained in:
parent
a8694bd863
commit
5968c6353f
|
@ -107,7 +107,9 @@ typedef struct SSyncFSM {
|
||||||
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
|
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
|
||||||
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
|
||||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
|
||||||
|
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader);
|
||||||
|
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||||
|
|
||||||
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
|
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
|
||||||
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
|
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
|
||||||
|
@ -193,8 +195,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
||||||
bool syncEnvIsStart();
|
bool syncEnvIsStart();
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
bool syncIsRestoreFinish(int64_t rid);
|
bool syncIsRestoreFinish(int64_t rid);
|
||||||
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
|
||||||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
|
||||||
|
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,17 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
|
||||||
|
// TODO:
|
||||||
|
|
||||||
|
// atomic operation
|
||||||
|
// step1. sdbGetCommitInfo
|
||||||
|
// step2. create ppReader with pReaderParam
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
|
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -159,6 +169,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
||||||
pFsm->FpReConfigCb = mndReConfig;
|
pFsm->FpReConfigCb = mndReConfig;
|
||||||
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
||||||
|
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
|
||||||
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
|
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
|
||||||
pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
|
pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
|
||||||
pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
|
pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
|
||||||
|
|
|
@ -353,8 +353,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
char logBuf[256] = {0};
|
char logBuf[256] = {0};
|
||||||
|
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
(*pFsm->FpGetSnapshot)(pFsm, &snapshot);
|
(*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,7 +416,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
||||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||||
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = NULL;
|
pFsm->FpRestoreFinishCb = NULL;
|
||||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||||
|
|
|
@ -238,6 +238,9 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
|
||||||
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
|
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
|
||||||
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
|
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
|
||||||
|
|
||||||
|
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
||||||
|
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
||||||
|
|
||||||
void syncStartNormal(int64_t rid);
|
void syncStartNormal(int64_t rid);
|
||||||
void syncStartStandBy(int64_t rid);
|
void syncStartStandBy(int64_t rid);
|
||||||
|
|
||||||
|
|
|
@ -497,7 +497,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
do {
|
do {
|
||||||
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
|
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
|
|
||||||
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||||
syncNodeHasSnapshot(ths);
|
syncNodeHasSnapshot(ths);
|
||||||
|
@ -710,7 +710,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
|
||||||
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
|
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
|
||||||
// advance commit index to sanpshot first
|
// advance commit index to sanpshot first
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
|
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
|
||||||
SyncIndex commitBegin = ths->commitIndex;
|
SyncIndex commitBegin = ths->commitIndex;
|
||||||
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
||||||
|
|
|
@ -123,7 +123,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
|
syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex);
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
||||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||||
}
|
}
|
||||||
|
@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
ASSERT(pSender != NULL);
|
ASSERT(pSender != NULL);
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(ths);
|
bool hasSnapshot = syncNodeHasSnapshot(ths);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
|
|
||||||
// start sending snapshot first time
|
// start sending snapshot first time
|
||||||
// start here, stop by receiver
|
// start here, stop by receiver
|
||||||
|
@ -209,7 +209,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
|
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||||
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
||||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
// advance commit index to sanpshot first
|
// advance commit index to sanpshot first
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
|
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
|
||||||
SyncIndex commitBegin = pSyncNode->commitIndex;
|
SyncIndex commitBegin = pSyncNode->commitIndex;
|
||||||
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
SyncIndex commitEnd = snapshot.lastApplyIndex;
|
||||||
|
|
|
@ -809,9 +809,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
pSyncNode->restoreFinish = false;
|
pSyncNode->restoreFinish = false;
|
||||||
|
|
||||||
// pSyncNode->pSnapshot = NULL;
|
// pSyncNode->pSnapshot = NULL;
|
||||||
// if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
// if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
// pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
// pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
||||||
// pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
// pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
||||||
// }
|
// }
|
||||||
// tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
// tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
||||||
|
|
||||||
|
@ -1658,8 +1658,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||||
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
||||||
bool ret = false;
|
bool ret = false;
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
|
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
|
||||||
ret = true;
|
ret = true;
|
||||||
}
|
}
|
||||||
|
@ -1669,19 +1669,19 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
|
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
ASSERT(syncNodeHasSnapshot(pSyncNode));
|
ASSERT(syncNodeHasSnapshot(pSyncNode));
|
||||||
ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL);
|
ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL);
|
||||||
ASSERT(index >= SYNC_INDEX_BEGIN);
|
ASSERT(index >= SYNC_INDEX_BEGIN);
|
||||||
|
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
bool b = (index <= snapshot.lastApplyIndex);
|
bool b = (index <= snapshot.lastApplyIndex);
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
|
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
}
|
}
|
||||||
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
|
||||||
|
@ -1694,8 +1694,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
|
||||||
if (syncNodeHasSnapshot(pSyncNode)) {
|
if (syncNodeHasSnapshot(pSyncNode)) {
|
||||||
// has snapshot
|
// has snapshot
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
@ -1747,8 +1747,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
if (syncNodeHasSnapshot(pSyncNode)) {
|
if (syncNodeHasSnapshot(pSyncNode)) {
|
||||||
// has snapshot
|
// has snapshot
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (index > snapshot.lastApplyIndex + 1) {
|
if (index > snapshot.lastApplyIndex + 1) {
|
||||||
|
|
|
@ -124,7 +124,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
|
||||||
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
|
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
|
||||||
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
|
||||||
pSender->replicaIndex = replicaIndex;
|
pSender->replicaIndex = replicaIndex;
|
||||||
pSender->term = pSyncNode->pRaftStore->currentTerm;
|
pSender->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
pSender->privateTerm = taosGetTimestampMs() + 100;
|
pSender->privateTerm = taosGetTimestampMs() + 100;
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
||||||
pSender->finish = false;
|
pSender->finish = false;
|
||||||
} else {
|
} else {
|
||||||
sError("snapshotSenderCreate cannot create sender");
|
sError("snapshotSenderCreate cannot create sender");
|
||||||
|
@ -84,7 +84,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
||||||
pSender->blockLen = 0;
|
pSender->blockLen = 0;
|
||||||
|
|
||||||
// get current snapshot info
|
// get current snapshot info
|
||||||
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
|
||||||
|
|
||||||
sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
|
sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
|
||||||
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
|
||||||
|
@ -558,7 +558,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish");
|
char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish");
|
||||||
|
|
|
@ -36,9 +36,9 @@ void cleanup() { walCleanUp(); }
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -159,7 +159,7 @@ SSyncFSM* createFsm() {
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
|
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
||||||
pFsm->FpSnapshotStartRead = SnapshotStartRead;
|
pFsm->FpSnapshotStartRead = SnapshotStartRead;
|
||||||
pFsm->FpSnapshotStopRead = SnapshotStopRead;
|
pFsm->FpSnapshotStopRead = SnapshotStopRead;
|
||||||
|
|
|
@ -35,9 +35,9 @@ void cleanup() { walCleanUp(); }
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +90,7 @@ SSyncFSM* createFsm() {
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
|
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
||||||
|
|
||||||
pFsm->FpReConfigCb = ReConfigCb;
|
pFsm->FpReConfigCb = ReConfigCb;
|
||||||
|
|
|
@ -54,7 +54,7 @@ void init() {
|
||||||
pSyncNode->pWal = pWal;
|
pSyncNode->pWal = pWal;
|
||||||
|
|
||||||
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||||
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb;
|
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanup() {
|
void cleanup() {
|
||||||
|
|
|
@ -54,7 +54,7 @@ void init() {
|
||||||
pSyncNode->pWal = pWal;
|
pSyncNode->pWal = pWal;
|
||||||
|
|
||||||
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||||
pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb;
|
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanup() {
|
void cleanup() {
|
||||||
|
@ -80,7 +80,7 @@ void test1() {
|
||||||
|
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
||||||
|
@ -146,7 +146,7 @@ void test2() {
|
||||||
|
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
||||||
|
@ -203,7 +203,7 @@ void test3() {
|
||||||
|
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
||||||
|
@ -268,7 +268,7 @@ void test4() {
|
||||||
|
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
||||||
|
@ -335,7 +335,7 @@ void test5() {
|
||||||
|
|
||||||
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
bool hasSnapshot = syncNodeHasSnapshot(pSyncNode);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode);
|
||||||
|
|
|
@ -32,9 +32,9 @@ void cleanup() { walCleanUp(); }
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,7 +75,7 @@ SSyncFSM* createFsm() {
|
||||||
pFsm->FpCommitCb = CommitCb;
|
pFsm->FpCommitCb = CommitCb;
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
return pFsm;
|
return pFsm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ SSyncSnapshotSender* createSender() {
|
||||||
pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead;
|
pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead;
|
||||||
pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead;
|
pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead;
|
||||||
pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead;
|
pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead;
|
||||||
pSyncNode->pFsm->FpGetSnapshot = GetSnapshot;
|
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot;
|
||||||
|
|
||||||
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2);
|
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2);
|
||||||
pSender->start = true;
|
pSender->start = true;
|
||||||
|
|
|
@ -35,9 +35,9 @@ const char *pWalDir = "./syncSnapshotTest_wal";
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
if (pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
pFsm->FpGetSnapshotInfo(pFsm, &snapshot);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ void initFsm() {
|
||||||
pFsm->FpCommitCb = CommitCb;
|
pFsm->FpCommitCb = CommitCb;
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncNode *syncNodeInit() {
|
SSyncNode *syncNodeInit() {
|
||||||
|
|
|
@ -172,7 +172,7 @@ SSyncFSM* createFsm() {
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
|
|
||||||
pFsm->FpReConfigCb = ReConfigCb;
|
pFsm->FpReConfigCb = ReConfigCb;
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
|
||||||
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
||||||
|
|
||||||
pFsm->FpSnapshotStartRead = SnapshotStartRead;
|
pFsm->FpSnapshotStartRead = SnapshotStartRead;
|
||||||
|
|
Loading…
Reference in New Issue