From 0c9e64f0d2002da2404e54d62996fd317ee6246a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 13:18:29 +0800 Subject: [PATCH 1/9] sync refactor --- source/libs/sync/test/CMakeLists.txt | 14 +++ source/libs/sync/test/syncElectTest3.cpp | 138 +++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 source/libs/sync/test/syncElectTest3.cpp diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index f02ba29218..653763b96b 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -29,6 +29,7 @@ add_executable(syncPingTimerTest2 "") add_executable(syncPingSelfTest "") add_executable(syncElectTest "") add_executable(syncElectTest2 "") +add_executable(syncElectTest3 "") add_executable(syncEncodeTest "") add_executable(syncWriteTest "") add_executable(syncReplicateTest "") @@ -159,6 +160,10 @@ target_sources(syncElectTest2 PRIVATE "syncElectTest2.cpp" ) +target_sources(syncElectTest3 + PRIVATE + "syncElectTest3.cpp" +) target_sources(syncEncodeTest PRIVATE "syncEncodeTest.cpp" @@ -332,6 +337,11 @@ target_include_directories(syncElectTest2 "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncElectTest3 + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncEncodeTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" @@ -478,6 +488,10 @@ target_link_libraries(syncElectTest2 sync gtest_main ) +target_link_libraries(syncElectTest3 + sync + gtest_main +) target_link_libraries(syncEncodeTest sync gtest_main diff --git a/source/libs/sync/test/syncElectTest3.cpp b/source/libs/sync/test/syncElectTest3.cpp new file mode 100644 index 0000000000..45c5488c60 --- /dev/null +++ b/source/libs/sync/test/syncElectTest3.cpp @@ -0,0 +1,138 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" +#include "tref.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect3_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./elect3_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + pSyncNode->hbBaseLine = 500; + pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char*)"", pSyncNode); + initRaftId(pSyncNode); + + //--------------------------- + while (1) { + sTrace( + "elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + taosMsleep(1000); + } + + syncNodeRelease(pSyncNode); + + return 0; +} From 559f570ea0cd2bba43e76e9efdb04c2a22f5a880 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 14:43:58 +0800 Subject: [PATCH 2/9] sync refactor --- source/libs/sync/src/syncIO.c | 13 +- source/libs/sync/src/syncMain.c | 2 +- source/libs/sync/src/syncMessage.c | 12 +- source/libs/sync/test/CMakeLists.txt | 14 ++ source/libs/sync/test/syncReplicateTest2.cpp | 199 +++++++++++++++++++ 5 files changed, 223 insertions(+), 17 deletions(-) create mode 100644 source/libs/sync/test/syncReplicateTest2.cpp diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 19121632e2..fcfffea9c3 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void * syncIOConsumerFunc(void *param); +static void *syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; + SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { @@ -257,13 +257,6 @@ static void *syncIOConsumerFunc(void *param) { assert(pSyncMsg != NULL); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); - /* - pSyncMsg = syncPingBuild(pRpcMsg->contLen); - syncPingFromRpcMsg(pRpcMsg, pSyncMsg); - // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); - io->FpOnSyncPing(io->pSyncNode, pSyncMsg); - syncPingDestroy(pSyncMsg); - */ } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b92317ef3f..98cf025093 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -122,7 +122,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ret = 0; } else { - sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); + sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); ret = -1; // todo : need define err code !! } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3ac4a7a1c0..f2344c6b6d 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -834,7 +834,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastLogIndex); cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); @@ -1127,14 +1127,14 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); - cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); - cJSON_AddStringToObject(pRoot, "commit_index", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); char* s; @@ -1288,7 +1288,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddNumberToObject(pRoot, "success", pMsg->success); - snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->matchIndex); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 653763b96b..ec43517974 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -33,6 +33,7 @@ add_executable(syncElectTest3 "") add_executable(syncEncodeTest "") add_executable(syncWriteTest "") add_executable(syncReplicateTest "") +add_executable(syncReplicateTest2 "") add_executable(syncRefTest "") @@ -176,6 +177,10 @@ target_sources(syncReplicateTest PRIVATE "syncReplicateTest.cpp" ) +target_sources(syncReplicateTest2 + PRIVATE + "syncReplicateTest2.cpp" +) target_sources(syncRefTest PRIVATE "syncRefTest.cpp" @@ -357,6 +362,11 @@ target_include_directories(syncReplicateTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncReplicateTest2 + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncRefTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" @@ -504,6 +514,10 @@ target_link_libraries(syncReplicateTest sync gtest_main ) +target_link_libraries(syncReplicateTest2 + sync + gtest_main +) target_link_libraries(syncRefTest sync gtest_main diff --git a/source/libs/sync/test/syncReplicateTest2.cpp b/source/libs/sync/test/syncReplicateTest2.cpp new file mode 100644 index 0000000000..352d2892bf --- /dev/null +++ b/source/libs/sync/test/syncReplicateTest2.cpp @@ -0,0 +1,199 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM *pFsm; +SWal * pWal; + +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, + isWeak, code); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, + index, isWeak, code); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, + isWeak, code); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void initFsm() { + pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; +} + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + // pSyncNode->hbBaseLine = 500; + // pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0(int i) { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 128; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + initFsm(); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char *)"", pSyncNode); + initRaftId(pSyncNode); + + for (int i = 0; i < 30; ++i) { + // step0 + SRpcMsg *pMsg0 = step0(i); + syncRpcMsgPrint2((char *)"==step0==", pMsg0); + + syncPropose(rid, pMsg0, true); + taosMsleep(1000); + + free(pMsg0); + + sTrace( + "syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + } + + while (1) { + sTrace( + "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + taosMsleep(1000); + } + + return 0; +} From 944bb33156535d054794e2cac3e3cbdb8092c7a6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 15:29:32 +0800 Subject: [PATCH 3/9] sync refactor --- source/libs/sync/src/syncAppendEntries.c | 26 ++++++++++++++++++++---- source/libs/sync/src/syncMain.c | 2 +- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e4df93ca47..9922357134 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // reject request if ((pMsg->term < ths->pRaftStore->currentTerm) || ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { + sTrace( + "syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; @@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // return to follower state if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { + sTrace( + "syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK); + syncNodeBecomeFollower(ths); // ret or reply? @@ -159,6 +169,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntryDestory(pPreEntry); } + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d, preMatch:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch); + if (preMatch) { // must has preIndex in local log assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); @@ -167,7 +182,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { bool hasAppendEntries = pMsg->dataLen > 0; if (hasExtraEntries && hasAppendEntries) { - // conflict + // not conflict by default bool conflict = false; SyncIndex extraIndex = pMsg->prevLogIndex + 1; @@ -177,8 +192,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); assert(pAppendEntry != NULL); + // log not match, conflict assert(extraIndex == pAppendEntry->index); - if (pExtraEntry->term == pAppendEntry->term) { + if (pExtraEntry->term != pAppendEntry->term) { conflict = true; } @@ -187,6 +203,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); SyncIndex delEnd = extraIndex; + sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); + // notice! reverse roll back! for (SyncIndex index = delEnd; index >= delBegin; --index) { if (ths->pFsm->FpRollBackCb != NULL) { @@ -212,7 +230,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2); } } rpcFreeCont(rpcMsg.pCont); @@ -237,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3); } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 98cf025093..a896ac5ff4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -863,7 +863,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1); } } rpcFreeCont(rpcMsg.pCont); From 1c3113686e0f00e5a186cc0b9a8d8f4629e05755 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 16:17:17 +0800 Subject: [PATCH 4/9] sync refactor --- include/libs/sync/sync.h | 2 +- source/libs/sync/src/syncMain.c | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a38431a1b2..74d9dfd970 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -35,6 +35,7 @@ typedef enum { TAOS_SYNC_STATE_FOLLOWER = 100, TAOS_SYNC_STATE_CANDIDATE = 101, TAOS_SYNC_STATE_LEADER = 102, + TAOS_SYNC_STATE_ERROR = 103, } ESyncState; typedef struct SSyncBuffer { @@ -160,7 +161,6 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility ESyncState syncGetMyRole(int64_t rid); -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); extern int32_t sDebugFlag; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a896ac5ff4..18213ca62c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -136,13 +136,14 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { } ESyncState syncGetMyRole(int64_t rid) { - // todo : get pointer from rid - SSyncNode* pSyncNode = NULL; + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return TAOS_SYNC_STATE_ERROR; + } + assert(rid == pSyncNode->rid); return pSyncNode->state; } -void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} - // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); From f7432337fd1846a0561adf80b08d44dd0bb0b90f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 16:43:30 +0800 Subject: [PATCH 5/9] sync refactor --- include/libs/sync/sync.h | 23 +++++++++------- source/libs/sync/src/syncAppendEntries.c | 8 +++--- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncIO.c | 6 ++--- source/libs/sync/src/syncMain.c | 4 +-- source/libs/sync/test/syncReplicateTest.cpp | 28 +++++++++++++------- source/libs/sync/test/syncReplicateTest2.cpp | 22 ++++++++------- source/libs/sync/test/syncWriteTest.cpp | 28 +++++++++++++------- 8 files changed, 74 insertions(+), 47 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 74d9dfd970..44efc33dbb 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -69,17 +69,20 @@ typedef struct SSnapshot { typedef struct SSyncFSM { void* data; - // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result + // when value in pMsg finish a raft flow, FpCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state); - // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result + // when value in pMsg has been written into local log store, FpPreCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state); // when log entry is updated by a new one, FpRollBackCb is called // user can do something to roll back. for example, delete data from tsdb, or just ignore it - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state); // user should implement this function, use "data" to take snapshot into "snapshot" int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); @@ -155,11 +158,11 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose ESyncState syncGetMyRole(int64_t rid); extern int32_t sDebugFlag; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 9922357134..960fc6d55e 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -213,7 +213,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0); + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pRollBackEntry); } @@ -230,7 +230,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } rpcFreeCont(rpcMsg.pCont); @@ -255,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); } } rpcFreeCont(rpcMsg.pCont); @@ -326,7 +326,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm->FpCommitCb != NULL) { - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0d4df8e6cf..89b5f0b39c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -91,7 +91,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (pSyncNode->pFsm->FpCommitCb != NULL) { - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state); } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index fcfffea9c3..b05d2e397d 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void *syncIOConsumerFunc(void *param); +static void * syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO *io = param; + SSyncIO * io = param; STaosQall *qall; - SRpcMsg *pRpcMsg, rpcMsg; + SRpcMsg * pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 18213ca62c..bbb0142584 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -849,7 +849,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state); } } rpcFreeCont(rpcMsg.pCont); @@ -864,7 +864,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state); } } rpcFreeCont(rpcMsg.pCont); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index 4d6e6f3a25..47399eeb3c 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -28,19 +28,29 @@ SSyncFSM * pFsm; SWal * pWal; SSyncNode *gSyncNode; -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } void initFsm() { diff --git a/source/libs/sync/test/syncReplicateTest2.cpp b/source/libs/sync/test/syncReplicateTest2.cpp index 352d2892bf..09dbc0e2ed 100644 --- a/source/libs/sync/test/syncReplicateTest2.cpp +++ b/source/libs/sync/test/syncReplicateTest2.cpp @@ -27,24 +27,28 @@ SSyncInfo syncInfo; SSyncFSM *pFsm; SWal * pWal; -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) { +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, - isWeak, code); + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) { +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, - index, isWeak, code); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) { +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, - isWeak, code); + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } diff --git a/source/libs/sync/test/syncWriteTest.cpp b/source/libs/sync/test/syncWriteTest.cpp index 2598abbddd..732bcf140b 100644 --- a/source/libs/sync/test/syncWriteTest.cpp +++ b/source/libs/sync/test/syncWriteTest.cpp @@ -28,19 +28,29 @@ SSyncFSM * pFsm; SWal * pWal; SSyncNode *gSyncNode; -void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } -void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { - printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); - syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); } void initFsm() { From 7c6fbbfbd12689e172cd2fd994a44a4a4a9a5501 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 16:58:36 +0800 Subject: [PATCH 6/9] sync refactor --- include/libs/sync/sync.h | 3 ++ source/libs/sync/src/syncMain.c | 49 +++++++++++++++++---------------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 44efc33dbb..7802aaca96 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -165,6 +165,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // us int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose ESyncState syncGetMyRole(int64_t rid); +// propose with sequence number, to implement linearizable semantics +int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); + extern int32_t sDebugFlag; #ifdef __cplusplus diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bbb0142584..f2341ef521 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -104,29 +104,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = 0; - - // todo : get pointer from rid - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return -1; - } - assert(rid == pSyncNode->rid); - - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); - SRpcMsg rpcMsg; - syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); - syncClientRequestDestroy(pSyncMsg); - ret = 0; - - } else { - sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); - ret = -1; // todo : need define err code !! - } - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + int32_t ret = syncPropose2(rid, pMsg, isWeak, 0); return ret; } @@ -144,6 +122,31 @@ ESyncState syncGetMyRole(int64_t rid) { return pSyncNode->state; } +int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum) { + int32_t ret = 0; + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + assert(rid == pSyncNode->rid); + + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncClientRequestDestroy(pSyncMsg); + ret = 0; + + } else { + sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); + ret = -1; // todo : need define err code !! + } + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return ret; +} + // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); From e3a3c1386be8d5f766ee9e9ecd7ebb326b0b8753 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 17:53:43 +0800 Subject: [PATCH 7/9] sync refactor --- include/libs/sync/sync.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7802aaca96..b2ed9ea3c4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -161,13 +161,15 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose +int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); // propose with sequence number, to implement linearizable semantics int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); +// for compatibility, the same as syncPropose +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); + extern int32_t sDebugFlag; #ifdef __cplusplus From 996886e72679331f83e72e0890f355f39b33db33 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 23 Mar 2022 17:08:07 +0800 Subject: [PATCH 8/9] sync refactor --- source/libs/sync/inc/syncRaftLog.h | 7 + source/libs/sync/src/syncAppendEntries.c | 72 ++++--- source/libs/sync/src/syncAppendEntriesReply.c | 6 + source/libs/sync/src/syncRaftLog.c | 55 +++++ source/libs/sync/src/syncReplication.c | 6 + source/libs/sync/test/CMakeLists.txt | 28 +++ source/libs/sync/test/syncLogStoreCheck.cpp | 107 ++++++++++ .../libs/sync/test/syncReplicateLoadTest.cpp | 188 ++++++++++++++++++ 8 files changed, 437 insertions(+), 32 deletions(-) create mode 100644 source/libs/sync/test/syncLogStoreCheck.cpp create mode 100644 source/libs/sync/test/syncReplicateLoadTest.cpp diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index d979e0df15..2196d6c207 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -47,6 +47,8 @@ SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); +cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); +char* logStoreSimple2Str(SSyncLogStore* pLogStore); // for debug void logStorePrint(SSyncLogStore* pLogStore); @@ -54,6 +56,11 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore); void logStoreLog(SSyncLogStore* pLogStore); void logStoreLog2(char* s, SSyncLogStore* pLogStore); +void logStoreSimplePrint(SSyncLogStore* pLogStore); +void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore); +void logStoreSimpleLog(SSyncLogStore* pLogStore); +void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 960fc6d55e..5ea810180d 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -155,30 +155,35 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - bool preMatch = false; - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && - ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { - preMatch = true; - } - if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); - assert(pPreEntry != NULL); - if (pMsg->prevLogTerm == pPreEntry->term) { - preMatch = true; - } - syncEntryDestory(pPreEntry); - } + /* + bool preMatch = false; + if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && + ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { + preMatch = true; + } + if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= + ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, + pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { preMatch = true; + } + syncEntryDestory(pPreEntry); + } - sTrace( - "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "ths->state:%d, logOK:%d, preMatch:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch); + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "ths->state:%d, logOK:%d, preMatch:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch); - if (preMatch) { - // must has preIndex in local log + if (preMatch) { + */ + + { + // preIndex = -1, or has preIndex entry in local log assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + // has extra entries (> preIndex) in local log bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); + + // has entries in SyncAppendEntries msg bool hasAppendEntries = pMsg->dataLen > 0; if (hasExtraEntries && hasAppendEntries) { @@ -287,21 +292,24 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncAppendEntriesReplyDestroy(pReply); - - } else { - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; - - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); } + /* + else { + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + } + */ + // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { // has commit entry in local diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 817974fd26..790ac7f8e1 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -48,6 +48,9 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p return ret; } + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex); + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex); + // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); @@ -77,5 +80,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); } + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex); + syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex); + return ret; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 620e923629..f569c5d621 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -165,6 +165,34 @@ char* logStore2Str(SSyncLogStore* pLogStore) { return serialized; } +cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) { + char u64buf[128]; + SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; + cJSON* pRoot = cJSON_CreateObject(); + + if (pData != NULL && pData->pWal != NULL) { + snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot); + return pJson; +} + +char* logStoreSimple2Str(SSyncLogStore* pLogStore) { + cJSON* pJson = logStoreSimple2Json(pLogStore); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + // for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { char* serialized = logStore2Str(pLogStore); @@ -191,3 +219,30 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) { sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized); free(serialized); } + +// for debug ----------------- +void logStoreSimplePrint(SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void logStoreSimpleLog(SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStoreSimple2Str(pLogStore); + sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0bb701fc09..943b268cd3 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -49,6 +49,10 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex); + syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex); + logStoreSimpleLog2("==syncNodeAppendEntriesPeers==", pSyncNode->pLogStore); + int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); @@ -99,6 +103,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { pMsg->prevLogTerm = preLogTerm; pMsg->commitIndex = pSyncNode->commitIndex; + syncAppendEntriesLog2("==syncNodeAppendEntriesPeers==", pMsg); + // send AppendEntries syncNodeAppendEntries(pSyncNode, pDestId, pMsg); syncAppendEntriesDestroy(pMsg); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index ec43517974..dcf380e7e4 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -34,7 +34,9 @@ add_executable(syncEncodeTest "") add_executable(syncWriteTest "") add_executable(syncReplicateTest "") add_executable(syncReplicateTest2 "") +add_executable(syncReplicateLoadTest "") add_executable(syncRefTest "") +add_executable(syncLogStoreCheck "") target_sources(syncTest @@ -181,10 +183,18 @@ target_sources(syncReplicateTest2 PRIVATE "syncReplicateTest2.cpp" ) +target_sources(syncReplicateLoadTest + PRIVATE + "syncReplicateLoadTest.cpp" +) target_sources(syncRefTest PRIVATE "syncRefTest.cpp" ) +target_sources(syncLogStoreCheck + PRIVATE + "syncLogStoreCheck.cpp" +) target_include_directories(syncTest @@ -367,11 +377,21 @@ target_include_directories(syncReplicateTest2 "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncReplicateLoadTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_include_directories(syncRefTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLogStoreCheck + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -518,10 +538,18 @@ target_link_libraries(syncReplicateTest2 sync gtest_main ) +target_link_libraries(syncReplicateLoadTest + sync + gtest_main +) target_link_libraries(syncRefTest sync gtest_main ) +target_link_libraries(syncLogStoreCheck + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncLogStoreCheck.cpp b/source/libs/sync/test/syncLogStoreCheck.cpp new file mode 100644 index 0000000000..5e5e43af01 --- /dev/null +++ b/source/libs/sync/test/syncLogStoreCheck.cpp @@ -0,0 +1,107 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 1; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; +SSyncNode* pSyncNode; + +SSyncNode* syncNodeInit(const char *path) { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./log_check"); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(path, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* logStoreCheck(const char *path) { return syncNodeInit(path); } + + + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + pSyncNode = logStoreCheck(argv[1]); + assert(pSyncNode != NULL); + + logStorePrint2((char*)"logStoreCheck", pSyncNode->pLogStore); + + return 0; +} diff --git a/source/libs/sync/test/syncReplicateLoadTest.cpp b/source/libs/sync/test/syncReplicateLoadTest.cpp new file mode 100644 index 0000000000..d53ceca473 --- /dev/null +++ b/source/libs/sync/test/syncReplicateLoadTest.cpp @@ -0,0 +1,188 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncRaftEntry.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM *pFsm; +SWal * pWal; + +void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak, + code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code, + ESyncState state) { + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", + pFsm, index, isWeak, code, state, syncUtilState2String(state)); + syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg); +} + +void initFsm() { + pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM)); + pFsm->FpCommitCb = CommitCb; + pFsm->FpPreCommitCb = PreCommitCb; + pFsm->FpRollBackCb = RollBackCb; +} + +int64_t syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + + char tmpdir[128]; + snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex); + pWal = walOpen(tmpdir, &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg *pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + int64_t rid = syncStart(&syncInfo); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + // pSyncNode->hbBaseLine = 500; + // pSyncNode->electBaseLine = 1500; + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest; + gSyncIO->pSyncNode = pSyncNode; + + syncNodeRelease(pSyncNode); + + return rid; +} + +void initRaftId(SSyncNode *pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char *s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +SRpcMsg *step0(int i) { + SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg)); + memset(pMsg, 0, sizeof(SRpcMsg)); + pMsg->msgType = 9999; + pMsg->contLen = 128; + pMsg->pCont = malloc(pMsg->contLen); + snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i); + return pMsg; +} + +SyncClientRequest *step1(const SRpcMsg *pMsg) { + SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true); + return pRetMsg; +} + +int main(int argc, char **argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + void logTest(); + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + initFsm(); + + ret = syncInit(); + assert(ret == 0); + + int64_t rid = syncNodeInit(); + assert(rid > 0); + + SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid); + assert(pSyncNode != NULL); + + syncNodePrint2((char *)"", pSyncNode); + initRaftId(pSyncNode); + + // only load ... + + while (1) { + sTrace( + "replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, " + "electTimerMS:%d", + pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, + pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); + taosMsleep(1000); + } + + return 0; +} From d5188f14f9fc7cea5b16a8114ab599fa989a52ee Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 23 Mar 2022 17:56:32 +0800 Subject: [PATCH 9/9] sync refactor --- include/libs/sync/sync.h | 10 +- source/libs/sync/src/syncAppendEntries.c | 225 +++++++++----------- source/libs/sync/src/syncCommit.c | 7 + source/libs/sync/test/syncLogStoreCheck.cpp | 6 +- 4 files changed, 109 insertions(+), 139 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b2ed9ea3c4..a2f88490f0 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -158,17 +158,17 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); // propose with sequence number, to implement linearizable semantics int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); // for compatibility, the same as syncPropose -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); extern int32_t sDebugFlag; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 5ea810180d..1116c1a905 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -155,102 +155,60 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { - /* - bool preMatch = false; - if (pMsg->prevLogIndex == SYNC_INDEX_INVALID && - ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) { - preMatch = true; - } - if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= - ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, - pMsg->prevLogIndex); assert(pPreEntry != NULL); if (pMsg->prevLogTerm == pPreEntry->term) { preMatch = true; + // preIndex = -1, or has preIndex entry in local log + assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + + // has extra entries (> preIndex) in local log + bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); + + // has entries in SyncAppendEntries msg + bool hasAppendEntries = pMsg->dataLen > 0; + + sTrace( + "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, " + "logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); + + if (hasExtraEntries && hasAppendEntries) { + // not conflict by default + bool conflict = false; + + SyncIndex extraIndex = pMsg->prevLogIndex + 1; + SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + assert(pExtraEntry != NULL); + + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + // log not match, conflict + assert(extraIndex == pAppendEntry->index); + if (pExtraEntry->term != pAppendEntry->term) { + conflict = true; + } + + if (conflict) { + // roll back + SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); + SyncIndex delEnd = extraIndex; + + sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); + + // notice! reverse roll back! + for (SyncIndex index = delEnd; index >= delBegin; --index) { + if (ths->pFsm->FpRollBackCb != NULL) { + SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + assert(pRollBackEntry != NULL); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); + ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); + rpcFreeCont(rpcMsg.pCont); + syncEntryDestory(pRollBackEntry); } - syncEntryDestory(pPreEntry); } - sTrace( - "syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "ths->state:%d, logOK:%d, preMatch:%d", - pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, preMatch); - - if (preMatch) { - */ - - { - // preIndex = -1, or has preIndex entry in local log - assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); - - // has extra entries (> preIndex) in local log - bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); - - // has entries in SyncAppendEntries msg - bool hasAppendEntries = pMsg->dataLen > 0; - - if (hasExtraEntries && hasAppendEntries) { - // not conflict by default - bool conflict = false; - - SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); - assert(pExtraEntry != NULL); - - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); - - // log not match, conflict - assert(extraIndex == pAppendEntry->index); - if (pExtraEntry->term != pAppendEntry->term) { - conflict = true; - } - - if (conflict) { - // roll back - SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); - SyncIndex delEnd = extraIndex; - - sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd); - - // notice! reverse roll back! - for (SyncIndex index = delEnd; index >= delBegin; --index) { - if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); - assert(pRollBackEntry != NULL); - - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state); - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pRollBackEntry); - } - } - - // delete confict entries - ths->pLogStore->truncate(ths->pLogStore, extraIndex); - - // append new entries - ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); - - // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); - } - } - rpcFreeCont(rpcMsg.pCont); - } - - // free memory - syncEntryDestory(pExtraEntry); - syncEntryDestory(pAppendEntry); - - } else if (hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else if (!hasExtraEntries && hasAppendEntries) { - SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + // delete confict entries + ths->pLogStore->truncate(ths->pLogStore, extraIndex); // append new entries ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); @@ -260,55 +218,62 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) { - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state); } } rpcFreeCont(rpcMsg.pCont); - - // free memory - syncEntryDestory(pAppendEntry); - - } else if (!hasExtraEntries && !hasAppendEntries) { - // do nothing - - } else { - assert(0); } - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = true; + // free memory + syncEntryDestory(pExtraEntry); + syncEntryDestory(pAppendEntry); - if (hasAppendEntries) { - pReply->matchIndex = pMsg->prevLogIndex + 1; - } else { - pReply->matchIndex = pMsg->prevLogIndex; - } + } else if (hasExtraEntries && !hasAppendEntries) { + // do nothing + } else if (!hasExtraEntries && hasAppendEntries) { + SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + assert(pAppendEntry != NULL); + + // append new entries + ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); + + // pre commit SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); + if (ths->pFsm != NULL) { + if (ths->pFsm->FpPreCommitCb != NULL) { + ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state); + } + } + rpcFreeCont(rpcMsg.pCont); - syncAppendEntriesReplyDestroy(pReply); + // free memory + syncEntryDestory(pAppendEntry); + + } else if (!hasExtraEntries && !hasAppendEntries) { + // do nothing + + } else { + assert(0); } - /* - else { - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); - pReply->srcId = ths->myRaftId; - pReply->destId = pMsg->srcId; - pReply->term = ths->pRaftStore->currentTerm; - pReply->success = false; - pReply->matchIndex = SYNC_INDEX_INVALID; + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = true; - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - } - */ + if (hasAppendEntries) { + pReply->matchIndex = pMsg->prevLogIndex + 1; + } else { + pReply->matchIndex = pMsg->prevLogIndex; + } + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); // maybe update commit index from leader if (pMsg->commitIndex > ths->commitIndex) { diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 89b5f0b39c..f581b31c4b 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { // update commit index newCommitIndex = index; + sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld", + newCommitIndex, pSyncNode->commitIndex); break; + } else { + sTrace( + "syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, " + "pSyncNode->pRaftStore->currentTerm:%lu", + pEntry->term, pSyncNode->pRaftStore->currentTerm); } } } diff --git a/source/libs/sync/test/syncLogStoreCheck.cpp b/source/libs/sync/test/syncLogStoreCheck.cpp index 5e5e43af01..85e39a61c5 100644 --- a/source/libs/sync/test/syncLogStoreCheck.cpp +++ b/source/libs/sync/test/syncLogStoreCheck.cpp @@ -26,7 +26,7 @@ SSyncFSM* pFsm; SWal* pWal; SSyncNode* pSyncNode; -SSyncNode* syncNodeInit(const char *path) { +SSyncNode* syncNodeInit(const char* path) { syncInfo.vgId = 1234; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; @@ -78,9 +78,7 @@ SSyncNode* syncNodeInit(const char *path) { return pSyncNode; } -SSyncNode* logStoreCheck(const char *path) { return syncNodeInit(path); } - - +SSyncNode* logStoreCheck(const char* path) { return syncNodeInit(path); } int main(int argc, char** argv) { // taosInitLog((char *)"syncTest.log", 100000, 10);