From b05706f9cf6727ad99fd1ce1515d1293a1f1404f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 23 Mar 2022 18:53:58 +0800 Subject: [PATCH 1/6] sync refactor --- source/libs/sync/inc/syncRaftEntry.h | 8 ++++++++ source/libs/sync/src/syncRaftEntry.c | 9 +++++++++ source/libs/sync/test/syncEntryTest.cpp | 20 ++++++++++++++++++-- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 037a49f45c..876859216f 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -27,6 +27,12 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" +typedef enum EntryType { + SYNC_RAFT_ENTRY_NULL = 0, + SYNC_RAFT_ENTRY_DATA = 1, + SYNC_RAFT_ENTRY_CONFIG = 2, +} EntryType; + typedef struct SSyncRaftEntry { uint32_t bytes; uint32_t msgType; @@ -35,12 +41,14 @@ typedef struct SSyncRaftEntry { bool isWeak; SyncTerm term; SyncIndex index; + EntryType entryType; uint32_t dataLen; char data[]; } SSyncRaftEntry; SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 +SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType); void syncEntryDestory(SSyncRaftEntry* pEntry); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index e4c5882e98..30ffebbc32 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -28,6 +28,13 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { // step 4. SyncClientRequest => SSyncRaftEntry, add term, index SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index, SYNC_RAFT_ENTRY_DATA); + assert(pEntry != NULL); + + return pEntry; +} + +SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType) { SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); assert(pEntry != NULL); @@ -37,6 +44,7 @@ SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncInde pEntry->isWeak = pMsg->isWeak; pEntry->term = term; pEntry->index = index; + pEntry->entryType = entryType; pEntry->dataLen = pMsg->dataLen; memcpy(pEntry->data, pMsg->data, pMsg->dataLen); @@ -85,6 +93,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { cJSON_AddStringToObject(pRoot, "term", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index); cJSON_AddStringToObject(pRoot, "index", u64buf); + cJSON_AddNumberToObject(pRoot, "entryType", pEntry->entryType); cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); char* s; diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index e54daaa79a..7f211276f8 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -46,6 +46,20 @@ void test2() { } void test3() { + SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + pSyncMsg->originalRpcType = 33; + pSyncMsg->seqNum = 11; + pSyncMsg->isWeak = 1; + strcpy(pSyncMsg->data, "test3"); + + SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NULL); + syncEntryPrint(pEntry); + + syncClientRequestDestroy(pSyncMsg); + syncEntryDestory(pEntry); +} + +void test4() { SSyncRaftEntry* pEntry = syncEntryBuild(10); assert(pEntry != NULL); pEntry->msgType = 11; @@ -54,7 +68,8 @@ void test3() { pEntry->isWeak = true; pEntry->term = 44; pEntry->index = 55; - strcpy(pEntry->data, "test3"); + pEntry->entryType = SYNC_RAFT_ENTRY_CONFIG; + strcpy(pEntry->data, "test4"); syncEntryPrint(pEntry); uint32_t len; @@ -75,7 +90,8 @@ int main(int argc, char** argv) { test1(); test2(); - test3(); + test3(); + test4(); return 0; } From e58812aab5df79c724ba647d587096c0bfcf9211 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 24 Mar 2022 11:40:36 +0800 Subject: [PATCH 2/6] sync refactor --- source/libs/sync/inc/syncRaftEntry.h | 3 ++- source/libs/sync/src/syncAppendEntries.c | 7 ++++--- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncMain.c | 26 +++++++++++++++++++++--- source/libs/sync/src/syncRaftEntry.c | 10 +++++++++ source/libs/sync/src/syncRaftLog.c | 6 +++--- source/libs/sync/test/syncEntryTest.cpp | 2 +- 7 files changed, 44 insertions(+), 12 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 876859216f..d4e63e9676 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -28,7 +28,7 @@ extern "C" { #include "taosdef.h" typedef enum EntryType { - SYNC_RAFT_ENTRY_NULL = 0, + SYNC_RAFT_ENTRY_NOOP = 0, SYNC_RAFT_ENTRY_DATA = 1, SYNC_RAFT_ENTRY_CONFIG = 2, } EntryType; @@ -49,6 +49,7 @@ typedef struct SSyncRaftEntry { SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4 SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index, EntryType entryType); +SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index); void syncEntryDestory(SSyncRaftEntry* pEntry); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1116c1a905..0a83e09605 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -198,6 +198,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (ths->pFsm->FpRollBackCb != NULL) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); + assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA); SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); @@ -217,7 +218,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } @@ -242,7 +243,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); } } @@ -298,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm->FpCommitCb != NULL) { + if (ths->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index f581b31c4b..6023c00afa 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -97,7 +97,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (pSyncNode->pFsm->FpCommitCb != NULL) { + if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f2341ef521..3080cbcd0d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -41,10 +41,11 @@ static void syncNodeEqPingTimer(void* param, void* tmrId); static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); -// on message ---- +// process message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); +static int32_t syncNodeAppendNoop(SSyncNode* ths); // life cycle static void syncFreeNode(void* param); @@ -669,6 +670,9 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); assert(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode); + + // Raft 3.6.2 Committing entries from previous terms + syncNodeAppendNoop(pSyncNode); } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { @@ -851,7 +855,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } } @@ -866,7 +870,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL && pEntry->entryType == SYNC_RAFT_ENTRY_DATA) { ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state); } } @@ -877,6 +881,22 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg return ret; } +static int32_t syncNodeAppendNoop(SSyncNode* ths) { + int32_t ret = 0; + + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index); + assert(pEntry != NULL); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncNodeReplicate(ths); + } + + return ret; +} + static void syncFreeNode(void* param) { SSyncNode* pNode = param; syncNodePrint2((char*)"==syncFreeNode==", pNode); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 30ffebbc32..9b9842b81d 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -51,6 +51,16 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde return pEntry; } +SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(0); + assert(pEntry != NULL); + pEntry->term = term; + pEntry->index = index; + pEntry->entryType = SYNC_RAFT_ENTRY_NOOP; + + return pEntry; +} + void syncEntryDestory(SSyncRaftEntry* pEntry) { if (pEntry != NULL) { free(pEntry); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index f569c5d621..cad2d5da41 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -34,7 +34,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->getLastTerm = logStoreLastTerm; pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; pLogStore->getCommitIndex = logStoreGetCommitIndex; - return pLogStore; // to avoid compiler error + return pLogStore; } void logStoreDestory(SSyncLogStore* pLogStore) { @@ -54,12 +54,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { assert(serialized != NULL); int code; - code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); + code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len); assert(code == 0); walFsync(pWal, true); free(serialized); - return code; // to avoid compiler error + return code; } SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index 7f211276f8..4acdb6052e 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -52,7 +52,7 @@ void test3() { pSyncMsg->isWeak = 1; strcpy(pSyncMsg->data, "test3"); - SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NULL); + SSyncRaftEntry* pEntry = syncEntryBuild3(pSyncMsg, 100, 200, SYNC_RAFT_ENTRY_NOOP); syncEntryPrint(pEntry); syncClientRequestDestroy(pSyncMsg); From b3ec7c8261d6562a389caabef2f36f4ccaabf6eb Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 24 Mar 2022 17:30:50 +0800 Subject: [PATCH 3/6] sync refactor --- source/libs/sync/src/syncMain.c | 67 ++++-- source/libs/sync/src/syncRaftLog.c | 14 +- source/libs/sync/test/CMakeLists.txt | 14 ++ source/libs/sync/test/syncEntryTest.cpp | 2 +- .../libs/sync/test/syncReplicateLoadTest.cpp | 5 +- source/libs/sync/test/syncReplicateTest.cpp | 12 +- source/libs/sync/test/syncReplicateTest2.cpp | 10 +- source/libs/sync/test/syncReplicateTest3.cpp | 217 ++++++++++++++++++ 8 files changed, 304 insertions(+), 37 deletions(-) create mode 100644 source/libs/sync/test/syncReplicateTest3.cpp diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3080cbcd0d..58e02b3866 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,15 +37,16 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- // enqueue message ---- -static void syncNodeEqPingTimer(void* param, void* tmrId); -static void syncNodeEqElectTimer(void* param, void* tmrId); -static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); +static void syncNodeEqPingTimer(void* param, void* tmrId); +static void syncNodeEqElectTimer(void* param, void* tmrId); +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); +static int32_t syncNodeEqNoop(SSyncNode* ths); +static int32_t syncNodeAppendNoop(SSyncNode* ths); // process message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); -static int32_t syncNodeAppendNoop(SSyncNode* ths); // life cycle static void syncFreeNode(void* param); @@ -673,6 +674,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); + // syncNodeEqNoop(pSyncNode); } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { @@ -807,6 +809,47 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } } +static int32_t syncNodeEqNoop(SSyncNode* ths) { + int32_t ret = 0; + assert(ths->state == TAOS_SYNC_STATE_LEADER); + + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index); + assert(pEntry != NULL); + + uint32_t entryLen; + char* serialized = syncEntrySerialize(pEntry, &entryLen); + SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen); + assert(pSyncMsg->dataLen == entryLen); + memcpy(pSyncMsg->data, serialized, entryLen); + + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); + ths->FpEqMsg(ths->queue, &rpcMsg); + + free(serialized); + syncClientRequestDestroy(pSyncMsg); + + return ret; +} + +static int32_t syncNodeAppendNoop(SSyncNode* ths) { + int32_t ret = 0; + + SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index); + assert(pEntry != NULL); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncNodeReplicate(ths); + } + + return ret; +} + // on message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { int32_t ret = 0; @@ -881,22 +924,6 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg return ret; } -static int32_t syncNodeAppendNoop(SSyncNode* ths) { - int32_t ret = 0; - - SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; - SyncTerm term = ths->pRaftStore->currentTerm; - SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index); - assert(pEntry != NULL); - - if (ths->state == TAOS_SYNC_STATE_LEADER) { - ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - syncNodeReplicate(ths); - } - - return ret; -} - static void syncFreeNode(void* param) { SSyncNode* pNode = param; syncNodePrint2((char*)"==syncFreeNode==", pNode); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index cad2d5da41..a788b91c01 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -48,14 +48,18 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - assert(pEntry->index == logStoreLastIndex(pLogStore) + 1); + SyncIndex lastIndex = logStoreLastIndex(pLogStore); + assert(pEntry->index == lastIndex + 1); uint32_t len; char* serialized = syncEntrySerialize(pEntry, &len); assert(serialized != NULL); - int code; - code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len); - assert(code == 0); + int code = 0; + /* + code = walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len); + assert(code == 0); + */ + assert(walWrite(pWal, pEntry->index, pEntry->entryType, serialized, len) == 0); walFsync(pWal, true); free(serialized); @@ -69,7 +73,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - walReadWithHandle(pWalHandle, index); + assert(walReadWithHandle(pWalHandle, index) == 0); pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); assert(pEntry != NULL); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index dcf380e7e4..7341565b1d 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -34,6 +34,7 @@ add_executable(syncEncodeTest "") add_executable(syncWriteTest "") add_executable(syncReplicateTest "") add_executable(syncReplicateTest2 "") +add_executable(syncReplicateTest3 "") add_executable(syncReplicateLoadTest "") add_executable(syncRefTest "") add_executable(syncLogStoreCheck "") @@ -183,6 +184,10 @@ target_sources(syncReplicateTest2 PRIVATE "syncReplicateTest2.cpp" ) +target_sources(syncReplicateTest3 + PRIVATE + "syncReplicateTest3.cpp" +) target_sources(syncReplicateLoadTest PRIVATE "syncReplicateLoadTest.cpp" @@ -377,6 +382,11 @@ target_include_directories(syncReplicateTest2 "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncReplicateTest3 + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncReplicateLoadTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" @@ -538,6 +548,10 @@ target_link_libraries(syncReplicateTest2 sync gtest_main ) +target_link_libraries(syncReplicateTest3 + sync + gtest_main +) target_link_libraries(syncReplicateLoadTest sync gtest_main diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp index 4acdb6052e..2c8433426a 100644 --- a/source/libs/sync/test/syncEntryTest.cpp +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -90,7 +90,7 @@ int main(int argc, char** argv) { test1(); test2(); - test3(); + test3(); test4(); return 0; diff --git a/source/libs/sync/test/syncReplicateLoadTest.cpp b/source/libs/sync/test/syncReplicateLoadTest.cpp index d53ceca473..b2d3f1a98d 100644 --- a/source/libs/sync/test/syncReplicateLoadTest.cpp +++ b/source/libs/sync/test/syncReplicateLoadTest.cpp @@ -178,9 +178,10 @@ int main(int argc, char **argv) { while (1) { sTrace( "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d", + "electTimerMS:%d, commitIndex:%ld", pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, + pSyncNode->commitIndex); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 47399eeb3c..09aec6534f 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -183,18 +183,20 @@ int main(int argc, char **argv) { taosMsleep(1000); sTrace( - "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d", + "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d, commitIndex:%ld", gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, - gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); + gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS, + gSyncNode->commitIndex); } while (1) { sTrace( "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d", + "electTimerMS:%d, commitIndex:%ld", gSyncNode->state, syncUtilState2String(gSyncNode->state), gSyncNode->pRaftStore->currentTerm, - gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS); + gSyncNode->electTimerLogicClock, gSyncNode->electTimerLogicClockUser, gSyncNode->electTimerMS, + gSyncNode->commitIndex); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncReplicateTest2.cpp b/source/libs/sync/test/syncReplicateTest2.cpp index 09dbc0e2ed..8739a8d12f 100644 --- a/source/libs/sync/test/syncReplicateTest2.cpp +++ b/source/libs/sync/test/syncReplicateTest2.cpp @@ -185,17 +185,19 @@ int main(int argc, char **argv) { sTrace( "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d", + "electTimerMS:%d, commitIndex:%ld", pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, + pSyncNode->commitIndex); } while (1) { sTrace( "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " - "electTimerMS:%d", + "electTimerMS:%d, commitIndex:%ld", pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, + pSyncNode->commitIndex); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncReplicateTest3.cpp b/source/libs/sync/test/syncReplicateTest3.cpp new file mode 100644 index 0000000000..ad2172ccd2 --- /dev/null +++ b/source/libs/sync/test/syncReplicateTest3.cpp @@ -0,0 +1,217 @@ +#define ALLOW_FORBID_FUNC + +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM *pFsm; +SWal * pWal; + +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void initFsm() { + pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; +} + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + // pSyncNode->hbBaseLine = 500; + // pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0(int i) { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 128; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int recordCount = 100; + if (argc >= 3) { + recordCount = atoi(argv[2]); + } + + int sleepMS = 10; + if (argc >= 4) { + sleepMS = atoi(argv[3]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + initFsm(); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char *)"", pSyncNode); + initRaftId(pSyncNode); + + for (int i = 0; i < recordCount; ++i) { + // step0 + SRpcMsg *pMsg0 = step0(i); + syncRpcMsgPrint2((char *)"==step0==", pMsg0); + + syncPropose(rid, pMsg0, true); + taosMsleep(sleepMS); + + free(pMsg0); + + sTrace( + "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d, commitIndex:%ld", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, + pSyncNode->commitIndex); + } + + while (1) { + sTrace( + "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d, commitIndex:%ld", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS, + pSyncNode->commitIndex); + taosMsleep(1000); + } + + return 0; +} From 61f227816d2fd7052e73bbbdd675aa3909a2a3b2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 24 Mar 2022 18:41:20 +0800 Subject: [PATCH 4/6] sync refactor --- source/libs/sync/src/syncRaftLog.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a788b91c01..50b08d0e87 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -87,7 +87,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - walRollback(pWal, fromIndex); + assert(walRollback(pWal, fromIndex) == 0); return 0; // to avoid compiler error } From 83ecb8dd309b4ea2d18af1948a355618d88492b1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 24 Mar 2022 19:40:37 +0800 Subject: [PATCH 5/6] sync refactor --- source/libs/sync/src/syncIO.c | 33 ++++++++++++++++-------------- source/libs/sync/src/syncRaftLog.c | 2 +- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index b05d2e397d..da54aa9668 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void * syncIOConsumerFunc(void *param); +static void *syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -75,6 +75,7 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { syncRpcMsgPrint2(logBuf, pMsg); pMsg->handle = NULL; + pMsg->noResp = 1; rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); return ret; } @@ -234,9 +235,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; + SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { @@ -324,19 +325,21 @@ static void *syncIOConsumerFunc(void *param) { taosGetQitem(qall, (void **)&pRpcMsg); rpcFreeCont(pRpcMsg->pCont); - if (pRpcMsg->handle != NULL) { - int msgSize = 32; - memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.msgType = SYNC_RESPONSE; - rpcMsg.pCont = rpcMallocCont(msgSize); - rpcMsg.contLen = msgSize; - snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); - rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 0; + /* + if (pRpcMsg->handle != NULL) { + int msgSize = 32; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.msgType = SYNC_RESPONSE; + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; - syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg); - rpcSendResponse(&rpcMsg); - } + syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg); + rpcSendResponse(&rpcMsg); + } + */ taosFreeQitem(pRpcMsg); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 50b08d0e87..b28f899aa9 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -111,7 +111,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - walCommit(pWal, index); + assert(walCommit(pWal, index) == 0); return 0; // to avoid compiler error } From 16b44c9e28574561c33ae8e2b957a4fdcd516a30 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 25 Mar 2022 10:37:26 +0800 Subject: [PATCH 6/6] sync refactor --- source/libs/sync/src/syncAppendEntries.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0a83e09605..2e9cd63de6 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -198,7 +198,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (ths->pFsm->FpRollBackCb != NULL) { SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); - assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA); + + // maybe is a NOOP ENTRY + // assert(pRollBackEntry->entryType == SYNC_RAFT_ENTRY_DATA); SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);