From fa8284af733301e3edd8a2856c464feabd5e46b7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 7 Mar 2022 16:17:41 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncEnv.h | 7 +- source/libs/sync/inc/syncRaftStore.h | 24 +--- source/libs/sync/src/syncAppendEntries.c | 133 +++++++++--------- source/libs/sync/src/syncAppendEntriesReply.c | 27 ++-- source/libs/sync/src/syncRaftStore.c | 123 +--------------- source/libs/sync/src/syncRequestVote.c | 45 +++--- source/libs/sync/src/syncRequestVoteReply.c | 35 +++-- 7 files changed, 131 insertions(+), 263 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 2356651a4c..9fbea03265 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -46,12 +46,9 @@ typedef struct SSyncEnv { extern SSyncEnv* gSyncEnv; int32_t syncEnvStart(); - int32_t syncEnvStop(); - -tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); - -void syncEnvStopTimer(tmr_h* pTimer); +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); +void syncEnvStopTimer(tmr_h* pTimer); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index c480486ff0..591a5b9963 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -32,28 +32,18 @@ extern "C" { #define RAFT_STORE_PATH_LEN 128 typedef struct SRaftStore { - SyncTerm currentTerm; - SRaftId voteFor; - // FileFd fd; + SyncTerm currentTerm; + SRaftId voteFor; TdFilePtr pFile; char path[RAFT_STORE_PATH_LEN]; } SRaftStore; SRaftStore *raftStoreOpen(const char *path); - -static int32_t raftStoreInit(SRaftStore *pRaftStore); - -int32_t raftStoreClose(SRaftStore *pRaftStore); - -int32_t raftStorePersist(SRaftStore *pRaftStore); - -static bool raftStoreFileExist(char *path); - -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); - -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); - -void raftStorePrint(SRaftStore *pRaftStore); +int32_t raftStoreClose(SRaftStore *pRaftStore); +int32_t raftStorePersist(SRaftStore *pRaftStore); +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); +void raftStorePrint(SRaftStore *pRaftStore); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 243a566ff0..ba10234a1d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,70 +15,69 @@ #include "syncAppendEntries.h" -int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - // TLA+ Spec - // HandleAppendEntriesRequest(i, j, m) == - // LET logOk == \/ m.mprevLogIndex = 0 - // \/ /\ m.mprevLogIndex > 0 - // /\ m.mprevLogIndex <= Len(log[i]) - // /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term - // IN /\ m.mterm <= currentTerm[i] - // /\ \/ /\ \* reject request - // \/ m.mterm < currentTerm[i] - // \/ /\ m.mterm = currentTerm[i] - // /\ state[i] = Follower - // /\ \lnot logOk - // /\ Reply([mtype |-> AppendEntriesResponse, - // mterm |-> currentTerm[i], - // msuccess |-> FALSE, - // mmatchIndex |-> 0, - // msource |-> i, - // mdest |-> j], - // m) - // /\ UNCHANGED <> - // \/ \* return to follower state - // /\ m.mterm = currentTerm[i] - // /\ state[i] = Candidate - // /\ state' = [state EXCEPT ![i] = Follower] - // /\ UNCHANGED <> - // \/ \* accept request - // /\ m.mterm = currentTerm[i] - // /\ state[i] = Follower - // /\ logOk - // /\ LET index == m.mprevLogIndex + 1 - // IN \/ \* already done with request - // /\ \/ m.mentries = << >> - // \/ /\ m.mentries /= << >> - // /\ Len(log[i]) >= index - // /\ log[i][index].term = m.mentries[1].term - // \* This could make our commitIndex decrease (for - // \* example if we process an old, duplicated request), - // \* but that doesn't really affect anything. - // /\ commitIndex' = [commitIndex EXCEPT ![i] = - // m.mcommitIndex] - // /\ Reply([mtype |-> AppendEntriesResponse, - // mterm |-> currentTerm[i], - // msuccess |-> TRUE, - // mmatchIndex |-> m.mprevLogIndex + - // Len(m.mentries), - // msource |-> i, - // mdest |-> j], - // m) - // /\ UNCHANGED <> - // \/ \* conflict: remove 1 entry - // /\ m.mentries /= << >> - // /\ Len(log[i]) >= index - // /\ log[i][index].term /= m.mentries[1].term - // /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> - // log[i][index2]] - // IN log' = [log EXCEPT ![i] = new] - // /\ UNCHANGED <> - // \/ \* no conflict: append entry - // /\ m.mentries /= << >> - // /\ Len(log[i]) = m.mprevLogIndex - // /\ log' = [log EXCEPT ![i] = - // Append(log[i], m.mentries[1])] - // /\ UNCHANGED <> - // /\ UNCHANGED <> - // -} +// TLA+ Spec +// HandleAppendEntriesRequest(i, j, m) == +// LET logOk == \/ m.mprevLogIndex = 0 +// \/ /\ m.mprevLogIndex > 0 +// /\ m.mprevLogIndex <= Len(log[i]) +// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ /\ \* reject request +// \/ m.mterm < currentTerm[i] +// \/ /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ \lnot logOk +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> FALSE, +// mmatchIndex |-> 0, +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* return to follower state +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Candidate +// /\ state' = [state EXCEPT ![i] = Follower] +// /\ UNCHANGED <> +// \/ \* accept request +// /\ m.mterm = currentTerm[i] +// /\ state[i] = Follower +// /\ logOk +// /\ LET index == m.mprevLogIndex + 1 +// IN \/ \* already done with request +// /\ \/ m.mentries = << >> +// \/ /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term = m.mentries[1].term +// \* This could make our commitIndex decrease (for +// \* example if we process an old, duplicated request), +// \* but that doesn't really affect anything. +// /\ commitIndex' = [commitIndex EXCEPT ![i] = +// m.mcommitIndex] +// /\ Reply([mtype |-> AppendEntriesResponse, +// mterm |-> currentTerm[i], +// msuccess |-> TRUE, +// mmatchIndex |-> m.mprevLogIndex + +// Len(m.mentries), +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +// \/ \* conflict: remove 1 entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) >= index +// /\ log[i][index].term /= m.mentries[1].term +// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |-> +// log[i][index2]] +// IN log' = [log EXCEPT ![i] = new] +// /\ UNCHANGED <> +// \/ \* no conflict: append entry +// /\ m.mentries /= << >> +// /\ Len(log[i]) = m.mprevLogIndex +// /\ log' = [log EXCEPT ![i] = +// Append(log[i], m.mentries[1])] +// /\ UNCHANGED <> +// /\ UNCHANGED <> +// +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {} diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 81c9ea233b..23df8a539c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,17 +15,16 @@ #include "syncAppendEntriesReply.h" -int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - // TLA+ Spec - // HandleAppendEntriesResponse(i, j, m) == - // /\ m.mterm = currentTerm[i] - // /\ \/ /\ m.msuccess \* successful - // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] - // /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] - // \/ /\ \lnot m.msuccess \* not successful - // /\ nextIndex' = [nextIndex EXCEPT ![i][j] = - // Max({nextIndex[i][j] - 1, 1})] - // /\ UNCHANGED <> - // /\ Discard(m) - // /\ UNCHANGED <> -} +// TLA+ Spec +// HandleAppendEntriesResponse(i, j, m) == +// /\ m.mterm = currentTerm[i] +// /\ \/ /\ m.msuccess \* successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] +// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex] +// \/ /\ \lnot m.msuccess \* not successful +// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = +// Max({nextIndex[i][j] - 1, 1})] +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {} diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 59c85c38de..7154a21bd1 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -16,8 +16,11 @@ #include "syncRaftStore.h" #include "cJSON.h" -// to complie success: FileIO interface is modified +// private function +static int32_t raftStoreInit(SRaftStore *pRaftStore); +static bool raftStoreFileExist(char *path); +// public function SRaftStore *raftStoreOpen(const char *path) { int32_t ret; @@ -137,121 +140,3 @@ void raftStorePrint(SRaftStore *pRaftStore) { raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); printf("%s\n", storeBuf); } - -#if 0 - -SRaftStore *raftStoreOpen(const char *path) { - int32_t ret; - - SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); - if (pRaftStore == NULL) { - sError("raftStoreOpen malloc error"); - return NULL; - } - memset(pRaftStore, 0, sizeof(*pRaftStore)); - snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); - - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - memset(storeBuf, 0, sizeof(storeBuf)); - - if (!raftStoreFileExist(pRaftStore->path)) { - ret = raftStoreInit(pRaftStore); - assert(ret == 0); - } - - pRaftStore->fd = taosOpenFileReadWrite(pRaftStore->path); - if (pRaftStore->fd < 0) { - return NULL; - } - - int len = taosReadFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); - assert(len == RAFT_STORE_BLOCK_SIZE); - - ret = raftStoreDeserialize(pRaftStore, storeBuf, len); - assert(ret == 0); - - return pRaftStore; -} - -static int32_t raftStoreInit(SRaftStore *pRaftStore) { - pRaftStore->fd = taosOpenFileCreateWrite(pRaftStore->path); - if (pRaftStore->fd < 0) { - return -1; - } - - pRaftStore->currentTerm = 0; - pRaftStore->voteFor.addr = 0; - pRaftStore->voteFor.vgId = 0; - - int32_t ret = raftStorePersist(pRaftStore); - assert(ret == 0); - - taosCloseFile(pRaftStore->fd); - return 0; -} - -int32_t raftStoreClose(SRaftStore *pRaftStore) { - taosCloseFile(pRaftStore->fd); - free(pRaftStore); - return 0; -} - -int32_t raftStorePersist(SRaftStore *pRaftStore) { - int32_t ret; - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - - ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - assert(ret == 0); - - taosLSeekFile(pRaftStore->fd, 0, SEEK_SET); - - ret = taosWriteFile(pRaftStore->fd, storeBuf, sizeof(storeBuf)); - assert(ret == RAFT_STORE_BLOCK_SIZE); - - fsync(pRaftStore->fd); - return 0; -} - -static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } - -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { - cJSON *pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); - cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); - cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); - - char *serialized = cJSON_Print(pRoot); - int len2 = strlen(serialized); - assert(len2 < len); - memset(buf, 0, len); - snprintf(buf, len, "%s", serialized); - free(serialized); - - cJSON_Delete(pRoot); - return 0; -} - -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { - assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); - cJSON *pRoot = cJSON_Parse(buf); - - cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); - pRaftStore->currentTerm = pCurrentTerm->valueint; - - cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); - pRaftStore->voteFor.addr = pVoteForAddr->valueint; - - cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); - pRaftStore->voteFor.vgId = pVoteForVgid->valueint; - - cJSON_Delete(pRoot); - return 0; -} - -void raftStorePrint(SRaftStore *pRaftStore) { - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - printf("%s\n", storeBuf); -} - -#endif diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 0edd6d2ce4..533043c512 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,26 +15,25 @@ #include "syncRequestVote.h" -int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - // TLA+ Spec - // HandleRequestVoteRequest(i, j, m) == - // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) - // \/ /\ m.mlastLogTerm = LastTerm(log[i]) - // /\ m.mlastLogIndex >= Len(log[i]) - // grant == /\ m.mterm = currentTerm[i] - // /\ logOk - // /\ votedFor[i] \in {Nil, j} - // IN /\ m.mterm <= currentTerm[i] - // /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] - // \/ ~grant /\ UNCHANGED votedFor - // /\ Reply([mtype |-> RequestVoteResponse, - // mterm |-> currentTerm[i], - // mvoteGranted |-> grant, - // \* mlog is used just for the `elections' history variable for - // \* the proof. It would not exist in a real implementation. - // mlog |-> log[i], - // msource |-> i, - // mdest |-> j], - // m) - // /\ UNCHANGED <> -} +// TLA+ Spec +// HandleRequestVoteRequest(i, j, m) == +// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) +// \/ /\ m.mlastLogTerm = LastTerm(log[i]) +// /\ m.mlastLogIndex >= Len(log[i]) +// grant == /\ m.mterm = currentTerm[i] +// /\ logOk +// /\ votedFor[i] \in {Nil, j} +// IN /\ m.mterm <= currentTerm[i] +// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j] +// \/ ~grant /\ UNCHANGED votedFor +// /\ Reply([mtype |-> RequestVoteResponse, +// mterm |-> currentTerm[i], +// mvoteGranted |-> grant, +// \* mlog is used just for the `elections' history variable for +// \* the proof. It would not exist in a real implementation. +// mlog |-> log[i], +// msource |-> i, +// mdest |-> j], +// m) +// /\ UNCHANGED <> +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {} diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 63bba7c480..a5b434dbc5 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,21 +15,20 @@ #include "syncRequestVoteReply.h" -int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - // TLA+ Spec - // HandleRequestVoteResponse(i, j, m) == - // \* This tallies votes even when the current state is not Candidate, but - // \* they won't be looked at, so it doesn't matter. - // /\ m.mterm = currentTerm[i] - // /\ votesResponded' = [votesResponded EXCEPT ![i] = - // votesResponded[i] \cup {j}] - // /\ \/ /\ m.mvoteGranted - // /\ votesGranted' = [votesGranted EXCEPT ![i] = - // votesGranted[i] \cup {j}] - // /\ voterLog' = [voterLog EXCEPT ![i] = - // voterLog[i] @@ (j :> m.mlog)] - // \/ /\ ~m.mvoteGranted - // /\ UNCHANGED <> - // /\ Discard(m) - // /\ UNCHANGED <> -} +// TLA+ Spec +// HandleRequestVoteResponse(i, j, m) == +// \* This tallies votes even when the current state is not Candidate, but +// \* they won't be looked at, so it doesn't matter. +// /\ m.mterm = currentTerm[i] +// /\ votesResponded' = [votesResponded EXCEPT ![i] = +// votesResponded[i] \cup {j}] +// /\ \/ /\ m.mvoteGranted +// /\ votesGranted' = [votesGranted EXCEPT ![i] = +// votesGranted[i] \cup {j}] +// /\ voterLog' = [voterLog EXCEPT ![i] = +// voterLog[i] @@ (j :> m.mlog)] +// \/ /\ ~m.mvoteGranted +// /\ UNCHANGED <> +// /\ Discard(m) +// /\ UNCHANGED <> +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {}