From 981461ea46bc93061b6892fcc5a6389c2d7489ee Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Nov 2022 10:25:31 +0800 Subject: [PATCH 1/8] refactor(sync): add PreSnapshotTest --- include/common/tmsgdef.h | 2 + include/libs/sync/syncTools.h | 31 ++++ source/libs/sync/src/syncMessage.c | 147 ++++++++++++++++++ source/libs/sync/test/CMakeLists.txt | 14 ++ source/libs/sync/test/syncPreSnapshotTest.cpp | 97 ++++++++++++ 5 files changed, 291 insertions(+) create mode 100644 source/libs/sync/test/syncPreSnapshotTest.cpp diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 571d14fe3c..b1cf716c06 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -270,6 +270,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT, "sync-pre-snapshot", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_PRE_SNAPSHOT_REPLY, "sync-pre-snapshot-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) #if defined(TD_MSG_NUMBER_) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index a1cff2b738..3d6b119757 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -513,6 +513,37 @@ void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg); void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg); void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg); +// --------------------------------------------- +typedef struct SyncPreSnapshot { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + +} SyncPreSnapshot; + +SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId); +void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg); +void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen); +void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg); +char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len); +SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len); +void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg); +void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg); +SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg); +char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg); + +// for debug ---------------------- +void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg); +void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg); +void syncPreSnapshotLog(const SyncPreSnapshot* pMsg); +void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg); + // --------------------------------------------- typedef struct SyncApplyMsg { uint32_t bytes; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index f9609d9c39..51a08d228d 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2315,6 +2315,153 @@ void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) { } } +// ---- message process SyncPreSnapshot---- +SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncPreSnapshot); + SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT; + return pMsg; +} + +void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPreSnapshotSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPreSnapshot* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncPreSnapshotDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPreSnapshotSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg) { + syncPreSnapshotDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPreSnapshot* pMsg = syncPreSnapshotDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPreSnapshot", pRoot); + return pJson; +} + +char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg) { + cJSON* pJson = syncPreSnapshot2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg) { + char* serialized = syncPreSnapshot2Str(pMsg); + printf("syncPreSnapshotPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg) { + char* serialized = syncPreSnapshot2Str(pMsg); + printf("syncPreSnapshotPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotLog(const SyncPreSnapshot* pMsg) { + char* serialized = syncPreSnapshot2Str(pMsg); + sTrace("syncPreSnapshotLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPreSnapshot2Str(pMsg); + sTrace("syncPreSnapshotLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // ---- message process SyncApplyMsg---- SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 26dd32942b..ce5152171d 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -60,6 +60,7 @@ add_executable(syncRaftCfgIndexTest "") add_executable(syncHeartbeatTest "") add_executable(syncHeartbeatReplyTest "") add_executable(syncLocalCmdTest "") +add_executable(syncPreSnapshotTest "") target_sources(syncTest @@ -310,6 +311,10 @@ target_sources(syncLocalCmdTest PRIVATE "syncLocalCmdTest.cpp" ) +target_sources(syncPreSnapshotTest + PRIVATE + "syncPreSnapshotTest.cpp" +) target_include_directories(syncTest @@ -622,6 +627,11 @@ target_include_directories(syncLocalCmdTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncPreSnapshotTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -872,6 +882,10 @@ target_link_libraries(syncLocalCmdTest sync gtest_main ) +target_link_libraries(syncPreSnapshotTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncPreSnapshotTest.cpp b/source/libs/sync/test/syncPreSnapshotTest.cpp new file mode 100644 index 0000000000..03894dfa84 --- /dev/null +++ b/source/libs/sync/test/syncPreSnapshotTest.cpp @@ -0,0 +1,97 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncPreSnapshot *createMsg() { + SyncPreSnapshot *pMsg = syncPreSnapshotBuild(789); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 9527; + return pMsg; +} + +void test1() { + SyncPreSnapshot *pMsg = createMsg(); + syncPreSnapshotLog2((char *)"test1:", pMsg); + syncPreSnapshotDestroy(pMsg); +} + +void test2() { + SyncPreSnapshot *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncPreSnapshotSerialize(pMsg, serialized, len); + SyncPreSnapshot *pMsg2 = syncPreSnapshotBuild(789); + syncPreSnapshotDeserialize(serialized, len, pMsg2); + syncPreSnapshotLog2((char *)"test2: syncPreSnapshotSerialize -> syncPreSnapshotDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncPreSnapshotDestroy(pMsg); + syncPreSnapshotDestroy(pMsg2); +} + +void test3() { + SyncPreSnapshot *pMsg = createMsg(); + uint32_t len; + char * serialized = syncPreSnapshotSerialize2(pMsg, &len); + SyncPreSnapshot *pMsg2 = syncPreSnapshotDeserialize2(serialized, len); + syncPreSnapshotLog2((char *)"test3: syncPreSnapshotSerialize2 -> syncPreSnapshotDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncPreSnapshotDestroy(pMsg); + syncPreSnapshotDestroy(pMsg2); +} + +void test4() { + SyncPreSnapshot *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPreSnapshot2RpcMsg(pMsg, &rpcMsg); + SyncPreSnapshot *pMsg2 = (SyncPreSnapshot *)taosMemoryMalloc(rpcMsg.contLen); + syncPreSnapshotFromRpcMsg(&rpcMsg, pMsg2); + syncPreSnapshotLog2((char *)"test4: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncPreSnapshotDestroy(pMsg); + syncPreSnapshotDestroy(pMsg2); +} + +void test5() { + SyncPreSnapshot *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPreSnapshot2RpcMsg(pMsg, &rpcMsg); + SyncPreSnapshot *pMsg2 = syncPreSnapshotFromRpcMsg2(&rpcMsg); + syncPreSnapshotLog2((char *)"test5: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncPreSnapshotDestroy(pMsg); + syncPreSnapshotDestroy(pMsg2); +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + gRaftDetailLog = true; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} From 69af9556db5e155a43e77264fc0b6632495f951d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Nov 2022 10:47:19 +0800 Subject: [PATCH 2/8] refactor(sync): add PreSnapshotReply --- include/libs/sync/syncTools.h | 32 ++++ source/libs/sync/src/syncMessage.c | 150 ++++++++++++++++++ source/libs/sync/test/CMakeLists.txt | 14 ++ .../sync/test/syncPreSnapshotReplyTest.cpp | 99 ++++++++++++ 4 files changed, 295 insertions(+) create mode 100644 source/libs/sync/test/syncPreSnapshotReplyTest.cpp diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 3d6b119757..5e9e70aa19 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -544,6 +544,38 @@ void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg); void syncPreSnapshotLog(const SyncPreSnapshot* pMsg); void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg); +// --------------------------------------------- +typedef struct SyncPreSnapshotReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex matchIndex; + +} SyncPreSnapshotReply; + +SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId); +void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen); +void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg); +char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len); +SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len); +void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg); +void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg); +SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg); +char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg); + +// for debug ---------------------- +void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg); +void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg); + // --------------------------------------------- typedef struct SyncApplyMsg { uint32_t bytes; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 51a08d228d..b50be7e6dc 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2462,6 +2462,156 @@ void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg) { } } +// ---- message process SyncPreSnapshotReply---- +SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncPreSnapshotReply); + SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY; + return pMsg; +} + +void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncPreSnapshotReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPreSnapshotReply* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncPreSnapshotReplyDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPreSnapshotReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg) { + syncPreSnapshotReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPreSnapshotReply* pMsg = syncPreSnapshotReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); + cJSON_AddStringToObject(pRoot, "match-index", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPreSnapshotReply", pRoot); + return pJson; +} + +char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg) { + cJSON* pJson = syncPreSnapshotReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + printf("syncPreSnapshotReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + printf("syncPreSnapshotReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + sTrace("syncPreSnapshotReplyLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncPreSnapshotReply2Str(pMsg); + sTrace("syncPreSnapshotReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // ---- message process SyncApplyMsg---- SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index ce5152171d..1b7ec7f0df 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -61,6 +61,7 @@ add_executable(syncHeartbeatTest "") add_executable(syncHeartbeatReplyTest "") add_executable(syncLocalCmdTest "") add_executable(syncPreSnapshotTest "") +add_executable(syncPreSnapshotReplyTest "") target_sources(syncTest @@ -315,6 +316,10 @@ target_sources(syncPreSnapshotTest PRIVATE "syncPreSnapshotTest.cpp" ) +target_sources(syncPreSnapshotReplyTest + PRIVATE + "syncPreSnapshotReplyTest.cpp" +) target_include_directories(syncTest @@ -632,6 +637,11 @@ target_include_directories(syncPreSnapshotTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncPreSnapshotReplyTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -886,6 +896,10 @@ target_link_libraries(syncPreSnapshotTest sync gtest_main ) +target_link_libraries(syncPreSnapshotReplyTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncPreSnapshotReplyTest.cpp b/source/libs/sync/test/syncPreSnapshotReplyTest.cpp new file mode 100644 index 0000000000..7fa5c68a1d --- /dev/null +++ b/source/libs/sync/test/syncPreSnapshotReplyTest.cpp @@ -0,0 +1,99 @@ + +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncPreSnapshotReply *createMsg() { + SyncPreSnapshotReply *pMsg = syncPreSnapshotReplyBuild(789); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 9527; + pMsg->matchIndex = 12306; + return pMsg; +} + +void test1() { + SyncPreSnapshotReply *pMsg = createMsg(); + syncPreSnapshotReplyLog2((char *)"test1:", pMsg); + syncPreSnapshotReplyDestroy(pMsg); +} + +void test2() { + SyncPreSnapshotReply *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncPreSnapshotReplySerialize(pMsg, serialized, len); + SyncPreSnapshotReply *pMsg2 = syncPreSnapshotReplyBuild(789); + syncPreSnapshotReplyDeserialize(serialized, len, pMsg2); + syncPreSnapshotReplyLog2((char *)"test2: syncPreSnapshotReplySerialize -> syncPreSnapshotReplyDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncPreSnapshotReplyDestroy(pMsg); + syncPreSnapshotReplyDestroy(pMsg2); +} + +void test3() { + SyncPreSnapshotReply *pMsg = createMsg(); + uint32_t len; + char * serialized = syncPreSnapshotReplySerialize2(pMsg, &len); + SyncPreSnapshotReply *pMsg2 = syncPreSnapshotReplyDeserialize2(serialized, len); + syncPreSnapshotReplyLog2((char *)"test3: syncPreSnapshotReplySerialize2 -> syncPreSnapshotReplyDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncPreSnapshotReplyDestroy(pMsg); + syncPreSnapshotReplyDestroy(pMsg2); +} + +void test4() { + SyncPreSnapshotReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPreSnapshotReply2RpcMsg(pMsg, &rpcMsg); + SyncPreSnapshotReply *pMsg2 = (SyncPreSnapshotReply *)taosMemoryMalloc(rpcMsg.contLen); + syncPreSnapshotReplyFromRpcMsg(&rpcMsg, pMsg2); + syncPreSnapshotReplyLog2((char *)"test4: syncPreSnapshotReply2RpcMsg -> syncPreSnapshotReplyFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncPreSnapshotReplyDestroy(pMsg); + syncPreSnapshotReplyDestroy(pMsg2); +} + +void test5() { + SyncPreSnapshotReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPreSnapshotReply2RpcMsg(pMsg, &rpcMsg); + SyncPreSnapshotReply *pMsg2 = syncPreSnapshotReplyFromRpcMsg2(&rpcMsg); + syncPreSnapshotReplyLog2((char *)"test5: syncPreSnapshotReply2RpcMsg -> syncPreSnapshotReplyFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncPreSnapshotReplyDestroy(pMsg); + syncPreSnapshotReplyDestroy(pMsg2); +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + gRaftDetailLog = true; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} From 3fbb51346b5dada50a14c26ceb34ce15388a718f Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Nov 2022 10:52:25 +0800 Subject: [PATCH 3/8] refactor(sync): modify syncEnvTest --- source/libs/sync/test/syncEnvTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 404ab32d58..a220c2aed4 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -26,12 +26,12 @@ int main() { assert(ret == 0); for (int i = 0; i < 5; ++i) { - ret = syncEnvStartTimer(); + //ret = syncEnvStartTimer(); assert(ret == 0); taosMsleep(5000); - ret = syncEnvStopTimer(); + //ret = syncEnvStopTimer(); assert(ret == 0); taosMsleep(5000); From fd07a611a996f6f69ef3a7347bc99e2ee04e19bf Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Nov 2022 11:30:40 +0800 Subject: [PATCH 4/8] refactor(sync): add some log --- source/libs/sync/inc/syncInt.h | 7 +++++- source/libs/sync/src/syncMain.c | 41 ++++++++++++++++++++++++++++++++- 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 843dee1342..f1bdcf83f2 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -315,6 +315,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode); // trace log void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s); +void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s); @@ -337,7 +338,11 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s); -void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s); +void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); +void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s); + +void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); +void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 139e4be1b9..8f5f0eef89 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3676,4 +3676,43 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd, syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s); syncNodeEventLog(pSyncNode, logBuf); -} \ No newline at end of file +} + +void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "send sync-pre-snapshot to %s:%d {term:%" PRIu64 "}, %s", host, port, pMsg->term, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-pre-snapshot from %s:%d {term:%" PRIu64 "}, %s", host, port, pMsg->term, + s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->matchIndex, s); + syncNodeEventLog(pSyncNode, logBuf); +} + +void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->matchIndex, s); + syncNodeEventLog(pSyncNode, logBuf); +} From 21d60cb5be0484ad52b7e0354c3af7c7c4e01e72 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Nov 2022 11:46:29 +0800 Subject: [PATCH 5/8] refactor(sync): pre snapshot on message --- include/libs/sync/syncTools.h | 3 +++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 ++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 3 +++ source/libs/sync/src/syncSnapshot.c | 10 ++++++++++ 4 files changed, 18 insertions(+) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index e39d45b061..6a6838e388 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -785,6 +785,9 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); +int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); + int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 12e678886d..e0cc7d2dfb 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -197,6 +197,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d682fa9cc5..1d6728e18b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -436,6 +436,9 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_PRE_SNAPSHOT_REPLY, vmPutMsgToSyncQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, vmPutMsgToSyncCtrlQueue, 0) == NULL) goto _OVER; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index a7bafa9f28..ef18b29983 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -922,3 +922,13 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { return 0; } + +int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { + syncLogRecvSyncPreSnapshot(ths, pMsg, ""); + return 0; +} + +int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { + syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); + return 0; +} \ No newline at end of file From 59448cd18ecf278dc83c87a108981619031caa1c Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Nov 2022 15:20:08 +0800 Subject: [PATCH 6/8] refactor(sync): pre snapshot on message --- include/libs/sync/syncTools.h | 2 +- source/libs/sync/src/syncMain.c | 10 ++-- source/libs/sync/src/syncMessage.c | 4 +- source/libs/sync/src/syncSnapshot.c | 50 +++++++++++++++++++ .../sync/test/syncPreSnapshotReplyTest.cpp | 2 +- 5 files changed, 60 insertions(+), 8 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 6a6838e388..24f1196bed 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -550,7 +550,7 @@ typedef struct SyncPreSnapshotReply { // private data SyncTerm term; - SyncIndex matchIndex; + SyncIndex snapStart; } SyncPreSnapshotReply; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8f5f0eef89..b70d6d5f09 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3702,8 +3702,9 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot uint16_t port; syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->matchIndex, s); + snprintf(logBuf, sizeof(logBuf), + "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->snapStart, s); syncNodeEventLog(pSyncNode, logBuf); } @@ -3712,7 +3713,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot uint16_t port; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", - host, port, pMsg->term, pMsg->matchIndex, s); + snprintf(logBuf, sizeof(logBuf), + "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port, + pMsg->term, pMsg->snapStart, s); syncNodeEventLog(pSyncNode, logBuf); } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index b50be7e6dc..d070d3e744 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2568,8 +2568,8 @@ cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); - cJSON_AddStringToObject(pRoot, "match-index", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart); + cJSON_AddStringToObject(pRoot, "snap-start", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index ef18b29983..6706e2f213 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -925,10 +925,60 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { syncLogRecvSyncPreSnapshot(ths, pMsg, ""); + + SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId); + pMsgReply->srcId = ths->myRaftId; + pMsgReply->destId = pMsg->srcId; + pMsgReply->term = ths->pRaftStore->currentTerm; + + SSyncLogStoreData *pData = ths->pLogStore->data; + SWal *pWal = pData->pWal; + + if (syncNodeIsMnode(ths)) { + pMsgReply->snapStart = SYNC_INDEX_BEGIN; + + } else { + bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore); + int64_t walCommitVer = walGetCommittedVer(pWal); + + if (!isEmpty && ths->commitIndex != walCommitVer) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", + walCommitVer, ths->commitIndex); + syncNodeErrorLog(ths, logBuf); + + goto _IGNORE; + } + + pMsgReply->snapStart = ths->commitIndex + 1; + + // make local log clean + int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart); + if (code != 0) { + syncNodeErrorLog(ths, "truncate wal error"); + goto _IGNORE; + } + } + + // can not write behind _RESPONSE + SRpcMsg rpcMsg; + +_RESPONSE: + syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + + syncPreSnapshotReplyDestroy(pMsgReply); + return 0; + +_IGNORE: + syncPreSnapshotReplyDestroy(pMsgReply); return 0; } int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); + + // start snapshot + return 0; } \ No newline at end of file diff --git a/source/libs/sync/test/syncPreSnapshotReplyTest.cpp b/source/libs/sync/test/syncPreSnapshotReplyTest.cpp index 7fa5c68a1d..a30dcc2c54 100644 --- a/source/libs/sync/test/syncPreSnapshotReplyTest.cpp +++ b/source/libs/sync/test/syncPreSnapshotReplyTest.cpp @@ -22,7 +22,7 @@ SyncPreSnapshotReply *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->term = 9527; - pMsg->matchIndex = 12306; + pMsg->snapStart = 12306; return pMsg; } From f7191d2ffdc03a0d292072df94030c6cdfa65479 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 2 Nov 2022 15:25:14 +0800 Subject: [PATCH 7/8] refactor(sync): add pre snapshot --- source/libs/sync/inc/syncInt.h | 9 +- source/libs/sync/inc/syncSnapshot.h | 25 +- source/libs/sync/src/syncAppendEntries.c | 1 - source/libs/sync/src/syncAppendEntriesReply.c | 2 + source/libs/sync/src/syncMain.c | 10 + source/libs/sync/src/syncMessage.c | 11 +- source/libs/sync/src/syncReplication.c | 4 +- source/libs/sync/src/syncSnapshot.c | 488 +++++++++++------- .../sync/test/syncSnapshotReceiverTest.cpp | 1 - source/libs/sync/test/syncSnapshotRspTest.cpp | 2 +- .../libs/sync/test/syncSnapshotSendTest.cpp | 1 - .../libs/sync/test/syncSnapshotSenderTest.cpp | 3 +- 12 files changed, 359 insertions(+), 198 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index f1bdcf83f2..b5c01bf388 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -22,9 +22,8 @@ extern "C" { #include "sync.h" #include "syncTools.h" -#include "tlog.h" -#include "ttimer.h" #include "taosdef.h" +#include "tlog.h" #include "ttimer.h" // clang-format off @@ -344,6 +343,12 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s); +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s); + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index b8b7af2dda..760fc8ac73 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -28,10 +28,11 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" -#define SYNC_SNAPSHOT_SEQ_INVALID -1 -#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2 -#define SYNC_SNAPSHOT_SEQ_BEGIN 0 -#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF +#define SYNC_SNAPSHOT_SEQ_INVALID -2 +#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3 +#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1 +#define SYNC_SNAPSHOT_SEQ_BEGIN 0 +#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF #define SYNC_SNAPSHOT_RETRY_MS 5000 @@ -47,19 +48,19 @@ typedef struct SSyncSnapshotSender { SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode *pSyncNode; - int32_t replicaIndex; SyncTerm term; - SyncTerm privateTerm; int64_t startTime; bool finish; + + // init when create + SSyncNode *pSyncNode; + int32_t replicaIndex; } SSyncSnapshotSender; SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotSenderDestroy(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot, - void *pReader); +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); @@ -76,11 +77,13 @@ typedef struct SSyncSnapshotReceiver { int32_t ack; void *pWriter; SyncTerm term; - SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode *pSyncNode; + int64_t startTime; + + // init when create + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 7dab112a51..4638475e71 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -142,7 +142,6 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { // pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); pReply->matchIndex = SYNC_INDEX_INVALID; pReply->lastSendIndex = pMsg->prevLogIndex + 1; - pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->startTime = ths->startTime; if (pMsg->term < ths->pRaftStore->currentTerm) { diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5e6c9f1534..5d54db5b07 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -73,6 +73,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); ASSERT(code == 0); +#if 0 if (pMsg->privateTerm < pSender->privateTerm) { ASSERT(pReader != NULL); snapshotSenderStart(pSender, readerParam, snapshot, pReader); @@ -82,6 +83,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); } } +#endif } int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b70d6d5f09..d53cdc99f8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2355,6 +2355,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // init peer mgr syncNodePeerStateInit(pSyncNode); +#if 0 // update sender private term SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId)); if (pMySender != NULL) { @@ -2365,6 +2366,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { } (pMySender->privateTerm) += 100; } +#endif // close receiver if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) { @@ -3718,3 +3720,11 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot pMsg->term, pMsg->snapStart, s); syncNodeEventLog(pSyncNode, logBuf); } + +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {} + +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {} + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {} + +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {} diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index d070d3e744..ba81e61cb2 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2873,8 +2873,8 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->beginIndex); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); @@ -3048,8 +3048,8 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastIndex); cJSON_AddStringToObject(pRoot, "lastIndex", u64buf); @@ -3059,6 +3059,9 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) { cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack); cJSON_AddNumberToObject(pRoot, "code", pMsg->code); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapBeginIndex); + cJSON_AddStringToObject(pRoot, "snap-begin", u64buf); } cJSON* pJson = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index af53123421..4aa8b0bc34 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -62,8 +62,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { syncNodeEventLog(pSyncNode, logBuf); // start snapshot - int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); - ASSERT(code == 0); + // int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); + return 0; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 6706e2f213..b728315746 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -41,8 +41,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI } memset(pSender, 0, sizeof(*pSender)); - int64_t timeNow = taosGetTimestampMs(); - pSender->start = false; pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; @@ -53,8 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; - pSender->privateTerm = timeNow + 100; - pSender->startTime = timeNow; + pSender->startTime = 0; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { @@ -88,88 +85,30 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } -// begin send snapshot by param, snapshot, pReader -// -// action: -// 1. assert reader not start -// 2. update state -// 3. send first snapshot block -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot, - void *pReader) { +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ASSERT(!snapshotSenderIsStart(pSender)); - // init snapshot, parm, reader - ASSERT(pSender->pReader == NULL); - pSender->pReader = pReader; - pSender->snapshot = snapshot; - pSender->snapshotParam = snapshotParam; - - // init current block - if (pSender->pCurrentBlock != NULL) { - taosMemoryFree(pSender->pCurrentBlock); - } - pSender->blockLen = 0; - - // update term - pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; - ++(pSender->privateTerm); // increase private term - - // update state - pSender->finish = false; pSender->start = true; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; + pSender->pReader = NULL; + pSender->pCurrentBlock = NULL; + pSender->blockLen = 0; - // init last config - if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { - int32_t code = 0; - SSyncRaftEntry *pEntry = NULL; - bool getLastConfig = false; + pSender->snapshotParam.start = SYNC_INDEX_INVALID; + pSender->snapshotParam.end = SYNC_INDEX_INVALID; - code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore, - pSender->snapshot.lastConfigIndex, &pEntry); - if (code == 0 && pEntry != NULL) { - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + pSender->snapshot.data = NULL; + pSender->snapshotParam.end = SYNC_INDEX_INVALID; + pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; + pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID; + pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; - SSyncCfg lastConfig; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig); - ASSERT(ret == 0); - pSender->lastConfig = lastConfig; - getLastConfig = true; - - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); - } else { - if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) { - sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId); - pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg; - getLastConfig = true; - } - } - - // last config not found in wal, update to -1 - if (!getLastConfig) { - SyncIndex oldLastConfigIndex = pSender->snapshot.lastConfigIndex; - SyncIndex newLastConfigIndex = SYNC_INDEX_INVALID; - pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; - memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); - - // event log - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "snapshot sender update lcindex from %" PRId64 " to %" PRId64, - oldLastConfigIndex, newLastConfigIndex); - char *eventLog = snapshotSender2SimpleStr(pSender, logBuf); - syncNodeEventLog(pSender->pSyncNode, eventLog); - taosMemoryFree(eventLog); - } while (0); - } - - } else { - // no last config - memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); - } + memset(&(pSender->lastConfig), 0, sizeof(pSender->lastConfig)); + pSender->sendingMS = 0; + pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; + pSender->startTime = taosGetTimestampMs(); + pSender->finish = false; // build begin msg SyncSnapshotSend *pMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); @@ -181,8 +120,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; - pMsg->seq = pSender->seq; // SYNC_SNAPSHOT_SEQ_BEGIN - pMsg->privateTerm = pSender->privateTerm; + pMsg->startTime = pSender->startTime; + pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; // send msg SRpcMsg rpcMsg; @@ -201,6 +140,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho } int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { + // update flag + pSender->start = false; + pSender->finish = finish; + // close reader if (pSender->pReader != NULL) { int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); @@ -215,12 +158,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { pSender->blockLen = 0; } - // update flag - pSender->start = false; - pSender->finish = finish; - - // do not update term, maybe print - // event log do { char *eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender stop"); @@ -263,7 +200,9 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; pMsg->seq = pSender->seq; - pMsg->privateTerm = pSender->privateTerm; + + // pMsg->privateTerm = pSender->privateTerm; + memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); // send msg @@ -302,7 +241,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfig = pSender->lastConfig; pMsg->seq = pSender->seq; - pMsg->privateTerm = pSender->privateTerm; + + // pMsg->privateTerm = pSender->privateTerm; + memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); // send msg @@ -367,8 +308,10 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + // snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm); + // cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + cJSON_AddNumberToObject(pRoot, "finish", pSender->finish); } @@ -395,30 +338,38 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { snprintf(s, len, "%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64 - " seq:%d ack:%d finish:%d pterm:%" PRIu64 - " " - "replica-index:%d %s:%d}", + " seq:%d ack:%d finish:%d replica-index:%d %s:%d}", event, pSender, pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, - pSender->finish, pSender->privateTerm, pSender->replicaIndex, host, port); + pSender->finish, pSender->replicaIndex, host, port); return s; } int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { - // calculate index - - syncNodeEventLog(pSyncNode, "start snapshot ..."); + syncNodeEventLog(pSyncNode, "starting snapshot ..."); SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { - // create sender - } else { - // if is same - // return 0; + syncNodeErrorLog(pSyncNode, "start snapshot error, sender is null"); + return -1; } - // send begin msg + int32_t code = 0; + + if (snapshotSenderIsStart(pSender)) { + code = snapshotSenderStop(pSender, false); + if (code != 0) { + syncNodeErrorLog(pSyncNode, "snapshot sender stop error"); + return -1; + } + } + + code = snapshotSenderStart(pSender); + if (code != 0) { + syncNodeErrorLog(pSyncNode, "snapshot sender start error"); + return -1; + } return 0; } @@ -440,7 +391,6 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->pSyncNode = pSyncNode; pReceiver->fromId = fromId; pReceiver->term = pSyncNode->pRaftStore->currentTerm; - pReceiver->privateTerm = 0; pReceiver->snapshot.data = NULL; pReceiver->snapshot.lastApplyIndex = SYNC_INDEX_INVALID; pReceiver->snapshot.lastApplyTerm = 0; @@ -474,19 +424,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceive // receive first snapshot data // write first block data static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { - // update state - pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; - pReceiver->privateTerm = pBeginMsg->privateTerm; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; - pReceiver->fromId = pBeginMsg->srcId; pReceiver->start = true; - - // update snapshot - pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; - pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; - pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; - pReceiver->snapshotParam.start = pBeginMsg->beginIndex; - pReceiver->snapshotParam.end = pBeginMsg->lastIndex; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; // start writer ASSERT(pReceiver->pWriter == NULL); @@ -494,6 +433,19 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh &(pReceiver->snapshotParam), &(pReceiver->pWriter)); ASSERT(ret == 0); + pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + pReceiver->snapshotParam.start = pBeginMsg->beginIndex; + pReceiver->snapshotParam.end = pBeginMsg->lastIndex; + + pReceiver->fromId = pBeginMsg->srcId; + + // update snapshot + pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; + pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; + pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; + + pReceiver->startTime = pBeginMsg->startTime; + // event log do { char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver start"); @@ -523,22 +475,9 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { } // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver -// if already start, force close, start again int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { - if (!snapshotReceiverIsStart(pReceiver)) { - // first start - snapshotReceiverDoStart(pReceiver, pBeginMsg); - - } else { - // already start - sInfo("vgId:%d, snapshot recv, receiver already start", pReceiver->pSyncNode->vgId); - - // force close, abandon incomplete data - snapshotReceiverForceStop(pReceiver); - - // start again - snapshotReceiverDoStart(pReceiver, pBeginMsg); - } + ASSERT(!snapshotReceiverIsStart(pReceiver)); + snapshotReceiverDoStart(pReceiver, pBeginMsg); return 0; } @@ -698,8 +637,8 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term); cJSON_AddStringToObject(pRoot, "term", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); } cJSON *pJson = cJSON_CreateObject(); @@ -724,38 +663,204 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) syncUtilU642Addr(fromId.addr, host, sizeof(host), &port); snprintf(s, len, - "%s {%p start:%d ack:%d term:%" PRIu64 " pterm:%" PRIu64 " from:%s:%d s-param:%" PRId64 " e-param:%" PRId64 - " laindex:%" PRId64 " laterm:%" PRIu64 + "%s {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64 + " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " " "lcindex:%" PRId64 "}", - event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port, + event, pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start, pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex); return s; } +SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { + SyncIndex snapStart = SYNC_INDEX_INVALID; + + if (syncNodeIsMnode(ths)) { + snapStart = SYNC_INDEX_BEGIN; + + } else { + SSyncLogStoreData *pData = ths->pLogStore->data; + SWal *pWal = pData->pWal; + + bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore); + int64_t walCommitVer = walGetCommittedVer(pWal); + + if (!isEmpty && ths->commitIndex != walCommitVer) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore", + walCommitVer, ths->commitIndex); + syncNodeErrorLog(ths, logBuf); + + snapStart = walCommitVer + 1; + } else { + snapStart = ths->commitIndex + 1; + } + } + + return snapStart; +} + +static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + + if (snapshotReceiverIsStart(pReceiver)) { + // already start + + if (pMsg->startTime > pReceiver->startTime) { + goto _START_RECEIVER; + + } else if (pMsg->startTime == pReceiver->startTime) { + goto _SEND_REPLY; + + } else { + // ignore + return 0; + } + + } else { + // start new + goto _START_RECEIVER; + } + +_START_RECEIVER: + if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { + syncNodeErrorLog(pSyncNode, "snapshot receiver time skew too much"); + return -1; + } else { + // waiting for clock match + while (taosGetTimestampMs() > pMsg->startTime) { + taosMsleep(10); + } + + snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender + } + +_SEND_REPLY: + // build msg + ; // make complier happy + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->startTime = pReceiver->startTime; + pRspMsg->ack = pMsg->seq; // receiver maybe already closed + pRspMsg->code = 0; + pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode); + + // send msg + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncSnapshotRspDestroy(pRspMsg); + + return 0; +} + +static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { + // condition 1 + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + + if (snapshotReceiverIsStart(pReceiver)) { + if (pMsg->startTime > pReceiver->startTime) { + snapshotReceiverStop(pReceiver); + + } else if (pMsg->startTime == pReceiver->startTime) { + return 0; + } else { + // ignore + syncNodeEventLog(pSyncNode, "msg ignore"); + return 0; + } + } + +_START_RECEIVER: + if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { + syncNodeErrorLog(pSyncNode, "snapshot receiver time skew too much"); + return -1; + } else { + // waiting for clock match + while (taosGetTimestampMs() > pMsg->startTime) { + taosMsleep(10); + } + + snapshotReceiverStart(pReceiver, pMsg); + + // build msg + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->ack = pReceiver->ack; // receiver maybe already closed + pRspMsg->code = 0; + + // send msg + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncSnapshotRspDestroy(pRspMsg); + } + + return 0; +} + +static int32_t syncNodeOnSnapshotTransfer(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; } + +static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; } + // receiver on message // -// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm -// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig) -// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close -// condition 4, got data, update ack +// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT +// if receiver already start +// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start) +// if sender.start-time = receiver.start-time, maybe duplicate msg +// if sender.start-time < receiver.start-time, ignore +// else +// waiting for clock match +// start receiver(reply snapshot start) +// +// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN +// a. create writer with +// +// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig) +// +// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close +// +// condition 5, got data, update ack // int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { - // get receiver - SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - bool needRsp = false; + // if already drop replica, do not process + if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) { + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); + return 0; + } + + if (pMsg->term < pSyncNode->pRaftStore->currentTerm) { + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject, small term"); + return 0; + } + + if (pMsg->term > pSyncNode->pRaftStore->currentTerm) { + syncNodeStepDown(pSyncNode, pMsg->term); + } + syncNodeResetElectTimer(pSyncNode); + int32_t code = 0; + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { - if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { - // condition 1 - // begin, no data - snapshotReceiverStart(pReceiver, pMsg); - needRsp = true; + if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + syncNodeOnSnapshotPre(pSyncNode, pMsg); + + } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { + syncNodeOnSnapshotBegin(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { // condition 2 @@ -764,7 +869,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (code == 0) { snapshotReceiverStop(pReceiver); } - needRsp = true; + bool needRsp = true; // maybe update lastconfig if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { @@ -782,7 +887,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // condition 3 // force close snapshotReceiverForceStop(pReceiver); - needRsp = false; + bool needRsp = false; } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { // condition 4 @@ -790,7 +895,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (pMsg->seq == pReceiver->ack + 1) { snapshotReceiverGotData(pReceiver, pMsg); } - needRsp = true; + bool needRsp = true; } else { // error log @@ -805,26 +910,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return -1; } - // send ack - if (needRsp) { - // build msg - SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pReceiver->ack; // receiver maybe already closed - pRspMsg->code = 0; - pRspMsg->privateTerm = pReceiver->privateTerm; // receiver maybe already closed - - // send msg - SRpcMsg rpcMsg; - syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); - syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); - syncSnapshotRspDestroy(pRspMsg); - } - } else { // error log do { @@ -849,6 +934,52 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; } +int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { + // get sender + SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); + ASSERT(pSender != NULL); + + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + + // prepare + pSender->snapshotParam.start = pMsg->snapBeginIndex; + pSender->snapshotParam.end = snapshot.lastApplyIndex; + + if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) { + syncNodeErrorLog(pSyncNode, "snapshot last index too small"); + return -1; + } + + // start reader + int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader)); + if (code != 0) { + syncNodeErrorLog(pSyncNode, "create snapshot reader error"); + return -1; + } + + // build begin msg + SyncSnapshotSend *pSendMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); + pSendMsg->srcId = pSender->pSyncNode->myRaftId; + pSendMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; + pSendMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; + pSendMsg->beginIndex = pSender->snapshotParam.start; + pSendMsg->lastIndex = pSender->snapshot.lastApplyIndex; + pSendMsg->lastTerm = pSender->snapshot.lastApplyTerm; + pSendMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; + pSendMsg->lastConfig = pSender->lastConfig; + pSendMsg->startTime = pSender->startTime; + pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + + // send msg + SRpcMsg rpcMsg; + syncSnapshotSend2RpcMsg(pSendMsg, &rpcMsg); + syncNodeSendMsgById(&(pSendMsg->destId), pSender->pSyncNode, &rpcMsg); + syncSnapshotSendDestroy(pSendMsg); + + return 0; +} + // sender on message // // condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender @@ -857,8 +988,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { // if already drop replica, do not process - if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - sError("vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped", pSyncNode->vgId); + if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped"); return -1; } @@ -866,17 +997,26 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId)); ASSERT(pSender != NULL); + if (pMsg->startTime != pSender->startTime) { + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender/receiver start time not match"); + return -1; + } + // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { - // condition 1 + // prepare , send begin msg + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) { + syncNodeOnSnapshotReplyPre(pSyncNode, pMsg); + return 0; + } + // receive ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { snapshotSenderStop(pSender, true); return 0; } - // condition 2 // send next msg if (pMsg->ack == pSender->seq) { // update sender ack diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index 0f8e76f121..0a96c7dadc 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -50,7 +50,6 @@ SSyncSnapshotReceiver* createReceiver() { pReceiver->ack = 20; pReceiver->pWriter = (void*)0x11; pReceiver->term = 66; - pReceiver->privateTerm = 99; return pReceiver; } diff --git a/source/libs/sync/test/syncSnapshotRspTest.cpp b/source/libs/sync/test/syncSnapshotRspTest.cpp index 89348da617..63905c2182 100644 --- a/source/libs/sync/test/syncSnapshotRspTest.cpp +++ b/source/libs/sync/test/syncSnapshotRspTest.cpp @@ -21,7 +21,7 @@ SyncSnapshotRsp *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->term = 11; - pMsg->privateTerm = 99; + pMsg->startTime = 99; pMsg->lastIndex = 22; pMsg->lastTerm = 33; pMsg->ack = 44; diff --git a/source/libs/sync/test/syncSnapshotSendTest.cpp b/source/libs/sync/test/syncSnapshotSendTest.cpp index 6fcfa6f6c4..83f1dfebb3 100644 --- a/source/libs/sync/test/syncSnapshotSendTest.cpp +++ b/source/libs/sync/test/syncSnapshotSendTest.cpp @@ -21,7 +21,6 @@ SyncSnapshotSend *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->term = 11; - pMsg->privateTerm = 99; pMsg->lastIndex = 22; pMsg->lastTerm = 33; diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index 8d1f83b3b1..010757e724 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -55,7 +55,8 @@ SSyncSnapshotSender* createSender() { pSender->snapshot.lastApplyTerm = 88; pSender->sendingMS = 77; pSender->term = 66; - pSender->privateTerm = 99; + + //pSender->privateTerm = 99; return pSender; } From 3072afbacb62433dba0ae32d6d7ebc3981e295a6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 2 Nov 2022 15:25:26 +0800 Subject: [PATCH 8/8] refactor(sync): add pre snapshot --- include/libs/sync/sync.h | 1 + include/libs/sync/syncTools.h | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1a94dcf426..e8a50cb1f7 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -39,6 +39,7 @@ extern bool gRaftDetailLog; #define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_VNODE_LOG_RETENTION 100 +#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 24f1196bed..5bddf6a800 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -617,7 +617,7 @@ typedef struct SyncSnapshotSend { SyncTerm lastTerm; // snapshot.lastTerm SyncIndex lastConfigIndex; // snapshot.lastConfigIndex SSyncCfg lastConfig; - SyncTerm privateTerm; + int64_t startTime; int32_t seq; uint32_t dataLen; char data[]; @@ -652,9 +652,10 @@ typedef struct SyncSnapshotRsp { SyncTerm term; SyncIndex lastIndex; SyncTerm lastTerm; - SyncTerm privateTerm; + int64_t startTime; int32_t ack; int32_t code; + SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid } SyncSnapshotRsp; SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId);