enh(sync): add SyncSnapshotRsp SyncSnapshotSend

This commit is contained in:
Minghao Li 2022-05-30 21:21:51 +08:00
parent c5526ef915
commit b9e4543a40
4 changed files with 20 additions and 9 deletions

View File

@ -163,7 +163,7 @@ typedef struct SSyncNode {
// restore state // restore state
// sem_t restoreSem; // sem_t restoreSem;
bool restoreFinish; bool restoreFinish;
SSnapshot* pSnapshot; // SSnapshot* pSnapshot;
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA];

View File

@ -335,8 +335,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = 0x11; cbMeta.flag = 0x11;
SSnapshot snapshot;
ASSERT(ths->pFsm->FpGetSnapshot != NULL);
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
bool needExecute = true; bool needExecute = true;
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { if (cbMeta.index <= snapshot.lastApplyIndex) {
needExecute = false; needExecute = false;
} }

View File

@ -113,8 +113,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 0x1; cbMeta.flag = 0x1;
SSnapshot snapshot;
ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL);
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
bool needExecute = true; bool needExecute = true;
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { if (cbMeta.index <= snapshot.lastApplyIndex) {
needExecute = false; needExecute = false;
} }

View File

@ -575,11 +575,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// restore state // restore state
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
pSyncNode->pSnapshot = NULL;
if (pSyncNode->pFsm->FpGetSnapshot != NULL) { // pSyncNode->pSnapshot = NULL;
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); // if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); // pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
} // pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
// }
// tsem_init(&(pSyncNode->restoreSem), 0, 0); // tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart // start in syncNodeStart
@ -673,9 +674,11 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pFsm); taosMemoryFree(pSyncNode->pFsm);
} }
/*
if (pSyncNode->pSnapshot != NULL) { if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot); taosMemoryFree(pSyncNode->pSnapshot);
} }
*/
// tsem_destroy(&pSyncNode->restoreSem); // tsem_destroy(&pSyncNode->restoreSem);