From aeb94af5e0b382e2ca29db907de5c6c78c649bf3 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Mar 2022 16:15:18 +0800 Subject: [PATCH] sync encode/decode --- source/libs/sync/inc/syncMessage.h | 20 ++ source/libs/sync/src/syncMessage.c | 176 ++++++++++++++++ source/libs/sync/test/syncEncodeTest.cpp | 244 ++++++++++++++++++++++- 3 files changed, 439 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index a51567d1dd..3057e23bc2 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -158,6 +158,18 @@ typedef struct SyncAppendEntries { char data[]; } SyncAppendEntries; +#define SYNC_APPEND_ENTRIES_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \ + sizeof(SyncIndex) + sizeof(uint32_t)) + +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen); +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); + // --------------------------------------------- typedef struct SyncAppendEntriesReply { uint32_t bytes; @@ -169,6 +181,14 @@ typedef struct SyncAppendEntriesReply { SyncIndex matchIndex; } SyncAppendEntriesReply; +SyncAppendEntriesReply* syncAppendEntriesReplyBuild(); +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index ac3f550e3e..4c44b4691c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -381,4 +381,180 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); return pJson; +} + +// ---- message process SyncAppendEntries---- +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_APPEND_ENTRIES_FIX_LEN + dataLen; + SyncAppendEntries* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_APPEND_ENTRIES; + pMsg->dataLen = dataLen; +} + +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_APPEND_ENTRIES_FIX_LEN + pMsg->dataLen); +} + +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg) { + syncAppendEntriesDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); + cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commit_index", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); + return pJson; +} + +// ---- message process SyncAppendEntriesReply---- +SyncAppendEntriesReply* syncAppendEntriesReplyBuild() { + uint32_t bytes = sizeof(SyncAppendEntriesReply); + SyncAppendEntriesReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY; +} + +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) { + syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "success", pMsg->success); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); + cJSON_AddStringToObject(pRoot, "match_index", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); + return pJson; } \ No newline at end of file diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index e0179b2c8b..6197621051 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -15,6 +15,7 @@ void logTest() { } #define PING_MSG_LEN 20 +#define APPEND_ENTRIES_VALUE_LEN 32 void test1() { sTrace("test1: ---- syncPingSerialize, syncPingDeserialize"); @@ -213,7 +214,45 @@ void test5() { } void test6() { - sTrace("test6: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"); + sTrace("test6: ---- syncRequestVote2RpcMsg, syncRequestVoteFromRpcMsg"); + + SyncRequestVote* pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 20; + pMsg->lastLogIndex = 21; + pMsg->lastLogTerm = 22; + + { + cJSON* pJson = syncRequestVote2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes); + syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncRequestVote2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); +} + +void test7() { + sTrace("test7: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"); SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); @@ -251,6 +290,203 @@ void test6() { free(buf); } +void test8() { + sTrace("test8: ---- syncRequestVoteReply2RpcMsg, syncRequestVoteReplyFromRpcMsg"); + + SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 20; + pMsg->voteGranted = 1; + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes); + syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test9() { + sTrace("test9: ---- syncAppendEntriesSerialize, syncAppendEntriesDeserialize"); + + char msg[APPEND_ENTRIES_VALUE_LEN]; + snprintf(msg, sizeof(msg), "%s", "test value"); + SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 55; + pMsg->prevLogTerm = 66; + pMsg->commitIndex = 77; + memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncAppendEntriesSerialize(pMsg, buf, bufLen); + + SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes); + syncAppendEntriesDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); + free(buf); +} + +void test10() { + sTrace("test10: ---- syncAppendEntries2RpcMsg, syncAppendEntriesFromRpcMsg"); + + char msg[APPEND_ENTRIES_VALUE_LEN]; + snprintf(msg, sizeof(msg), "%s", "test value"); + SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 55; + pMsg->prevLogTerm = 66; + pMsg->commitIndex = 77; + memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes); + syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test11() { + sTrace("test11: ---- syncAppendEntriesReplySerialize, syncAppendEntriesReplyDeserialize"); + + SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->success = 1; + pMsg->matchIndex = 23; + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncAppendEntriesReplySerialize(pMsg, buf, bufLen); + + SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes); + syncAppendEntriesReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); + free(buf); +} + +void test12() { + sTrace("test12: ---- syncAppendEntriesReply2RpcMsg, syncAppendEntriesReplyFromRpcMsg"); + + SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->success = 1; + pMsg->matchIndex = 23; + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes); + syncAppendEntriesReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); +} + int main() { // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; @@ -262,6 +498,12 @@ int main() { test4(); test5(); test6(); + test7(); + test8(); + test9(); + test10(); + test11(); + test12(); return 0; }