sync refactor
This commit is contained in:
parent
b05706f9cf
commit
e58812aab5
|
@ -28,7 +28,7 @@ extern "C" {
|
|||
#include "taosdef.h"
|
||||
|
||||
typedef enum EntryType {
|
||||
SYNC_RAFT_ENTRY_NULL = 0,
|
||||
SYNC_RAFT_ENTRY_NOOP = 0,
|
||||
SYNC_RAFT_ENTRY_DATA = 1,
|
||||
SYNC_RAFT_ENTRY_CONFIG = 2,
|
||||
} EntryType;
|
||||
|
@ -49,6 +49,7 @@ typedef struct SSyncRaftEntry {
|
|||
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
|
||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
|
||||
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType);
|
||||
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index);
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
|
||||
|
|
|
@ -198,6 +198,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||
SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index);
|
||||
assert(pRollBackEntry != NULL);
|
||||
assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||
|
@ -217,7 +218,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
|
||||
}
|
||||
}
|
||||
|
@ -242,7 +243,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state);
|
||||
}
|
||||
}
|
||||
|
@ -298,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm->FpCommitCb != NULL) {
|
||||
if (ths->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
|
||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
|
||||
}
|
||||
|
||||
|
|
|
@ -97,7 +97,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
||||
if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
|
||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state);
|
||||
}
|
||||
|
||||
|
|
|
@ -41,10 +41,11 @@ static void syncNodeEqPingTimer(void* param, void* tmrId);
|
|||
static void syncNodeEqElectTimer(void* param, void* tmrId);
|
||||
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
|
||||
|
||||
// on message ----
|
||||
// process message ----
|
||||
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg);
|
||||
static int32_t syncNodeAppendNoop(SSyncNode* ths);
|
||||
|
||||
// life cycle
|
||||
static void syncFreeNode(void* param);
|
||||
|
@ -669,6 +670,9 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
|||
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
|
||||
syncNodeBecomeLeader(pSyncNode);
|
||||
|
||||
// Raft 3.6.2 Committing entries from previous terms
|
||||
syncNodeAppendNoop(pSyncNode);
|
||||
}
|
||||
|
||||
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
|
||||
|
@ -851,7 +855,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
|
||||
}
|
||||
}
|
||||
|
@ -866,7 +870,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
|||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
if (ths->pFsm != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||
if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) {
|
||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state);
|
||||
}
|
||||
}
|
||||
|
@ -877,6 +881,22 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||
int32_t ret = 0;
|
||||
|
||||
SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1;
|
||||
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
ths->pLogStore->appendEntry(ths->pLogStore, pEntry);
|
||||
syncNodeReplicate(ths);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void syncFreeNode(void* param) {
|
||||
SSyncNode* pNode = param;
|
||||
syncNodePrint2((char*)"==syncFreeNode==", pNode);
|
||||
|
|
|
@ -51,6 +51,16 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde
|
|||
return pEntry;
|
||||
}
|
||||
|
||||
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index) {
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(0);
|
||||
assert(pEntry != NULL);
|
||||
pEntry->term = term;
|
||||
pEntry->index = index;
|
||||
pEntry->entryType = SYNC_RAFT_ENTRY_NOOP;
|
||||
|
||||
return pEntry;
|
||||
}
|
||||
|
||||
void syncEntryDestory(SSyncRaftEntry* pEntry) {
|
||||
if (pEntry != NULL) {
|
||||
free(pEntry);
|
||||
|
|
|
@ -34,7 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
|||
pLogStore->getLastTerm = logStoreLastTerm;
|
||||
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
|
||||
pLogStore->getCommitIndex = logStoreGetCommitIndex;
|
||||
return pLogStore; // to avoid compiler error
|
||||
return pLogStore;
|
||||
}
|
||||
|
||||
void logStoreDestory(SSyncLogStore* pLogStore) {
|
||||
|
@ -54,12 +54,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
|||
assert(serialized != NULL);
|
||||
|
||||
int code;
|
||||
code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len);
|
||||
code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len);
|
||||
assert(code == 0);
|
||||
|
||||
walFsync(pWal, true);
|
||||
free(serialized);
|
||||
return code; // to avoid compiler error
|
||||
return code;
|
||||
}
|
||||
|
||||
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||
|
|
|
@ -52,7 +52,7 @@ void test3() {
|
|||
pSyncMsg->isWeak = 1;
|
||||
strcpy(pSyncMsg->data, "test3");
|
||||
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NULL);
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NOOP);
|
||||
syncEntryPrint(pEntry);
|
||||
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
|
|
Loading…
Reference in New Issue