diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 006ba7f21b..33d01bc46d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -261,6 +261,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_BATCH, "sync-append-entries-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 6c95c3c6d7..09e24477d7 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -444,6 +444,52 @@ void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); +// --------------------------------------------- +typedef struct SyncHeartbeat { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncIndex commitIndex; + SyncTerm privateTerm; +} SyncHeartbeat; + +SyncHeartbeat* syncHeartbeatBuild(int32_t vgId); +void syncHeartbeatDestroy(SyncHeartbeat* pMsg); +void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen); +void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg); +char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len); +SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len); +void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg); +void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg); +SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg); +char* syncHeartbeat2Str(const SyncHeartbeat* pMsg); + +// for debug ---------------------- +void syncHeartbeatPrint(const SyncHeartbeat* pMsg); +void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg); +void syncHeartbeatLog(const SyncHeartbeat* pMsg); +void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg); + +// --------------------------------------------- +typedef struct SyncHeartbeatReply { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + // private data + SyncTerm term; + SyncTerm privateTerm; + int64_t startTime; +} SyncHeartbeatReply; + // --------------------------------------------- typedef struct SyncApplyMsg { uint32_t bytes; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index b42aba560f..a16f87059f 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1992,6 +1992,159 @@ void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) { } } +// ---- message process SyncHeartbeat---- +SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncHeartbeat); + SyncHeartbeat* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES; + return pMsg; +} + +void syncHeartbeatDestroy(SyncHeartbeat* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncHeartbeatSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncHeartbeat* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncHeartbeatDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) { + syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commitIndex", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot); + return pJson; +} + +char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) { + cJSON* pJson = syncHeartbeat2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncHeartbeatPrint(const SyncHeartbeat* pMsg) { + char* serialized = syncHeartbeat2Str(pMsg); + printf("syncHeartbeatPrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) { + char* serialized = syncHeartbeat2Str(pMsg); + printf("syncHeartbeatPrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncHeartbeatLog(const SyncHeartbeat* pMsg) { + char* serialized = syncHeartbeat2Str(pMsg); + sTrace("syncHeartbeatLog | len:%" PRIu64 " | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncHeartbeat2Str(pMsg); + sTrace("syncHeartbeatLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} + // ---- message process SyncApplyMsg---- SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SyncApplyMsg) + dataLen; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 72845d0c1d..e2be1523c0 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -57,6 +57,7 @@ add_executable(syncLeaderTransferTest "") add_executable(syncReconfigFinishTest "") add_executable(syncRestoreFromSnapshot "") add_executable(syncRaftCfgIndexTest "") +add_executable(syncHeartbeatTest "") target_sources(syncTest @@ -295,6 +296,10 @@ target_sources(syncRaftCfgIndexTest PRIVATE "syncRaftCfgIndexTest.cpp" ) +target_sources(syncHeartbeatTest + PRIVATE + "syncHeartbeatTest.cpp" +) target_include_directories(syncTest @@ -592,6 +597,11 @@ target_include_directories(syncRaftCfgIndexTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncHeartbeatTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -830,6 +840,10 @@ target_link_libraries(syncRaftCfgIndexTest sync gtest_main ) +target_link_libraries(syncHeartbeatTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncHeartbeatTest.cpp b/source/libs/sync/test/syncHeartbeatTest.cpp new file mode 100644 index 0000000000..d910c828f1 --- /dev/null +++ b/source/libs/sync/test/syncHeartbeatTest.cpp @@ -0,0 +1,99 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncHeartbeat *createMsg() { + SyncHeartbeat *pMsg = syncHeartbeatBuild(789); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 8; + pMsg->commitIndex = 33; + pMsg->privateTerm = 44; + return pMsg; +} + +void test1() { + SyncHeartbeat *pMsg = createMsg(); + syncHeartbeatLog2((char *)"test1:", pMsg); + syncHeartbeatDestroy(pMsg); +} + +void test2() { + SyncHeartbeat *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncHeartbeatSerialize(pMsg, serialized, len); + SyncHeartbeat *pMsg2 = syncHeartbeatBuild(789); + syncHeartbeatDeserialize(serialized, len, pMsg2); + syncHeartbeatLog2((char *)"test2: syncHeartbeatSerialize -> syncHeartbeatDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncHeartbeatDestroy(pMsg); + syncHeartbeatDestroy(pMsg2); +} + +void test3() { + SyncHeartbeat *pMsg = createMsg(); + uint32_t len; + char * serialized = syncHeartbeatSerialize2(pMsg, &len); + SyncHeartbeat *pMsg2 = syncHeartbeatDeserialize2(serialized, len); + syncHeartbeatLog2((char *)"test3: syncHeartbeatSerialize2 -> syncHeartbeatDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncHeartbeatDestroy(pMsg); + syncHeartbeatDestroy(pMsg2); +} + +void test4() { + SyncHeartbeat *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncHeartbeat2RpcMsg(pMsg, &rpcMsg); + SyncHeartbeat *pMsg2 = (SyncHeartbeat *)taosMemoryMalloc(rpcMsg.contLen); + syncHeartbeatFromRpcMsg(&rpcMsg, pMsg2); + syncHeartbeatLog2((char *)"test4: syncHeartbeat2RpcMsg -> syncHeartbeatFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncHeartbeatDestroy(pMsg); + syncHeartbeatDestroy(pMsg2); +} + +void test5() { + SyncHeartbeat *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncHeartbeat2RpcMsg(pMsg, &rpcMsg); + SyncHeartbeat *pMsg2 =syncHeartbeatFromRpcMsg2(&rpcMsg); + syncHeartbeatLog2((char *)"test5: syncHeartbeat2RpcMsg -> syncHeartbeatFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncHeartbeatDestroy(pMsg); + syncHeartbeatDestroy(pMsg2); +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + gRaftDetailLog = true; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +}