From f263a623dd7186525c28bb79668426158404a71e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Mar 2022 14:52:30 +0800 Subject: [PATCH] sync encode/decode --- source/libs/sync/inc/syncMessage.h | 162 ++++++++------- source/libs/sync/src/syncMessage.c | 245 ++++++++++++++++------- source/libs/sync/test/syncEncodeTest.cpp | 129 +++++++++--- 3 files changed, 368 insertions(+), 168 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index be53559e8a..a51567d1dd 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -28,30 +28,25 @@ extern "C" { #include "syncRaftEntry.h" #include "taosdef.h" -// encode as uint64 +// encode as uint32 typedef enum ESyncMessageType { SYNC_PING = 101, SYNC_PING_REPLY = 103, - SYNC_CLIENT_REQUEST, - SYNC_CLIENT_REQUEST_REPLY, - SYNC_REQUEST_VOTE, - SYNC_REQUEST_VOTE_REPLY, - SYNC_APPEND_ENTRIES, - SYNC_APPEND_ENTRIES_REPLY, + SYNC_CLIENT_REQUEST = 105, + SYNC_CLIENT_REQUEST_REPLY = 107, + SYNC_REQUEST_VOTE = 109, + SYNC_REQUEST_VOTE_REPLY = 111, + SYNC_APPEND_ENTRIES = 113, + SYNC_APPEND_ENTRIES_REPLY = 115, } ESyncMessageType; -/* -typedef struct SRaftId { - SyncNodeId addr; // typedef uint64_t SyncNodeId; - SyncGroupId vgId; // typedef int32_t SyncGroupId; -} SRaftId; -*/ - +// --------------------------------------------- typedef struct SyncPing { uint32_t bytes; uint32_t msgType; SRaftId srcId; SRaftId destId; + // private data uint32_t dataLen; char data[]; } SyncPing; @@ -59,28 +54,22 @@ typedef struct SyncPing { #define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) SyncPing* syncPingBuild(uint32_t dataLen); - -void syncPingDestroy(SyncPing* pMsg); - -void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); - -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); - -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); - -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); - -cJSON* syncPing2Json(const SyncPing* pMsg); - +void syncPingDestroy(SyncPing* pMsg); +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); +cJSON* syncPing2Json(const SyncPing* pMsg); SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); - SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); +// --------------------------------------------- typedef struct SyncPingReply { uint32_t bytes; uint32_t msgType; SRaftId srcId; SRaftId destId; + // private data uint32_t dataLen; char data[]; } SyncPingReply; @@ -89,72 +78,95 @@ typedef struct SyncPingReply { (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) SyncPingReply* syncPingReplyBuild(uint32_t dataLen); - -void syncPingReplyDestroy(SyncPingReply* pMsg); - -void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); - -void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); - -void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); - -void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); - -cJSON* syncPingReply2Json(const SyncPingReply* pMsg); - +void syncPingReplyDestroy(SyncPingReply* pMsg); +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); - SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); +// --------------------------------------------- typedef struct SyncClientRequest { - ESyncMessageType msgType; - char* data; - uint32_t dataLen; - int64_t seqNum; - bool isWeak; + uint32_t bytes; + uint32_t msgType; + int64_t seqNum; + bool isWeak; + uint32_t dataLen; + char data[]; } SyncClientRequest; +// --------------------------------------------- typedef struct SyncClientRequestReply { - ESyncMessageType msgType; - int32_t errCode; - SSyncBuffer* pErrMsg; - SSyncBuffer* pLeaderHint; + uint32_t bytes; + uint32_t msgType; + int32_t errCode; + SRaftId leaderHint; } SyncClientRequestReply; +// --------------------------------------------- typedef struct SyncRequestVote { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - SyncGroupId vgId; - SyncIndex lastLogIndex; - SyncTerm lastLogTerm; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm currentTerm; + SyncIndex lastLogIndex; + SyncTerm lastLogTerm; } SyncRequestVote; +SyncRequestVote* syncRequestVoteBuild(); +void syncRequestVoteDestroy(SyncRequestVote* pMsg); +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); + +// --------------------------------------------- typedef struct SyncRequestVoteReply { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - SyncGroupId vgId; - bool voteGranted; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm term; + bool voteGranted; } SyncRequestVoteReply; +SyncRequestVoteReply* SyncRequestVoteReplyBuild(); +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); + +// --------------------------------------------- typedef struct SyncAppendEntries { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; - int32_t entryCount; - SSyncRaftEntry* logEntries; - SyncIndex commitIndex; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; + SyncIndex commitIndex; + uint32_t dataLen; + char data[]; } SyncAppendEntries; +// --------------------------------------------- typedef struct SyncAppendEntriesReply { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - bool success; - SyncIndex matchIndex; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + bool success; + SyncIndex matchIndex; } SyncAppendEntriesReply; #ifdef __cplusplus diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index a26c8401b2..ac3f550e3e 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -60,12 +60,15 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { } cJSON* syncPing2Json(const SyncPing* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; @@ -79,7 +82,8 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; @@ -154,12 +158,15 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { } cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; @@ -173,7 +180,8 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; @@ -208,72 +216,169 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) return pMsg; } -#if 0 -void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) { - *bufLen = sizeof(SyncPing) + pMsg->dataLen; - *ppBuf = (char*)malloc(*bufLen); - void* pStart = *ppBuf; - uint32_t allBytes = *bufLen; - - int len = 0; - len = taosEncodeFixedU32(&pStart, pMsg->msgType); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedU64(&pStart, pMsg->destId.addr); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedU32(&pStart, pMsg->dataLen); - allBytes -= len; - assert(len > 0); - pStart += len; - - memcpy(pStart, pMsg->data, pMsg->dataLen); - allBytes -= pMsg->dataLen; - assert(allBytes == 0); +// ---- message process SyncRequestVote---- +SyncRequestVote* syncRequestVoteBuild() { + uint32_t bytes = sizeof(SyncRequestVote); + SyncRequestVote* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_REQUEST_VOTE; } - -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { - void* pStart = (void*)buf; - uint64_t u64; - int32_t i32; - uint32_t u32; - - pStart = taosDecodeFixedU64(pStart, &u64); - pMsg->msgType = u64; - - pStart = taosDecodeFixedU64(pStart, &u64); - pMsg->srcId.addr = u64; - - pStart = taosDecodeFixedI32(pStart, &i32); - pMsg->srcId.vgId = i32; - - pStart = taosDecodeFixedU64(pStart, &u64); - pMsg->destId.addr = u64; - - pStart = taosDecodeFixedI32(pStart, &i32); - pMsg->destId.vgId = i32; - - pStart = taosDecodeFixedU32(pStart, &u32); - pMsg->dataLen = u32; +void syncRequestVoteDestroy(SyncRequestVote* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } } -#endif \ No newline at end of file + +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { + syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->currentTerm); + cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); + cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); + cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot); + return pJson; +} + +// ---- message process SyncRequestVoteReply---- +SyncRequestVoteReply* SyncRequestVoteReplyBuild() { + uint32_t bytes = sizeof(SyncRequestVoteReply); + SyncRequestVoteReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_REQUEST_VOTE_REPLY; +} + +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) { + syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); + return pJson; +} \ No newline at end of file diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 4deceb603e..e0179b2c8b 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -3,6 +3,7 @@ #include "syncIO.h" #include "syncInt.h" #include "syncMessage.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -21,16 +22,16 @@ void test1() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "test ping"); SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); - pMsg->srcId.addr = 1; - pMsg->srcId.vgId = 2; - pMsg->destId.addr = 3; - pMsg->destId.vgId = 4; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPing: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -45,7 +46,7 @@ void test1() { { cJSON* pJson = syncPing2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPing2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -61,16 +62,16 @@ void test2() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "hello raft"); SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); - pMsg->srcId.addr = 100; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 3333); pMsg->srcId.vgId = 200; - pMsg->destId.addr = 300; - pMsg->destId.vgId = 400; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 4444); + pMsg->destId.vgId = 200; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPing: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -84,7 +85,7 @@ void test2() { { cJSON* pJson = syncPing2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPing2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -99,16 +100,16 @@ void test3() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "test ping"); SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); - pMsg->srcId.addr = 19; - pMsg->srcId.vgId = 29; - pMsg->destId.addr = 39; - pMsg->destId.vgId = 49; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 5555); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 6666); + pMsg->destId.vgId = 100; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -123,7 +124,7 @@ void test3() { { cJSON* pJson = syncPingReply2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -139,16 +140,16 @@ void test4() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "hello raft"); SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); - pMsg->srcId.addr = 66; - pMsg->srcId.vgId = 77; - pMsg->destId.addr = 88; - pMsg->destId.vgId = 99; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 7777); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 8888); + pMsg->destId.vgId = 100; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -162,7 +163,7 @@ void test4() { { cJSON* pJson = syncPingReply2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -170,6 +171,86 @@ void test4() { syncPingReplyDestroy(pMsg); syncPingReplyDestroy(pMsg2); } + +void test5() { + sTrace("test5: ---- syncRequestVoteSerialize, syncRequestVoteDeserialize"); + + SyncRequestVote* pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 20; + pMsg->lastLogIndex = 21; + pMsg->lastLogTerm = 22; + + { + cJSON* pJson = syncRequestVote2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncRequestVoteSerialize(pMsg, buf, bufLen); + + SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes); + syncRequestVoteDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncRequestVote2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); + free(buf); +} + +void test6() { + sTrace("test6: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"); + + SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 20; + pMsg->voteGranted = 1; + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncRequestVoteReplySerialize(pMsg, buf, bufLen); + + SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes); + syncRequestVoteReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); + free(buf); +} + int main() { // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; @@ -179,6 +260,8 @@ int main() { test2(); test3(); test4(); + test5(); + test6(); return 0; }