diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index be53559e8a..3057e23bc2 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -28,30 +28,25 @@ extern "C" { #include "syncRaftEntry.h" #include "taosdef.h" -// encode as uint64 +// encode as uint32 typedef enum ESyncMessageType { SYNC_PING = 101, SYNC_PING_REPLY = 103, - SYNC_CLIENT_REQUEST, - SYNC_CLIENT_REQUEST_REPLY, - SYNC_REQUEST_VOTE, - SYNC_REQUEST_VOTE_REPLY, - SYNC_APPEND_ENTRIES, - SYNC_APPEND_ENTRIES_REPLY, + SYNC_CLIENT_REQUEST = 105, + SYNC_CLIENT_REQUEST_REPLY = 107, + SYNC_REQUEST_VOTE = 109, + SYNC_REQUEST_VOTE_REPLY = 111, + SYNC_APPEND_ENTRIES = 113, + SYNC_APPEND_ENTRIES_REPLY = 115, } ESyncMessageType; -/* -typedef struct SRaftId { - SyncNodeId addr; // typedef uint64_t SyncNodeId; - SyncGroupId vgId; // typedef int32_t SyncGroupId; -} SRaftId; -*/ - +// --------------------------------------------- typedef struct SyncPing { uint32_t bytes; uint32_t msgType; SRaftId srcId; SRaftId destId; + // private data uint32_t dataLen; char data[]; } SyncPing; @@ -59,28 +54,22 @@ typedef struct SyncPing { #define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) SyncPing* syncPingBuild(uint32_t dataLen); - -void syncPingDestroy(SyncPing* pMsg); - -void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); - -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); - -void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); - -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); - -cJSON* syncPing2Json(const SyncPing* pMsg); - +void syncPingDestroy(SyncPing* pMsg); +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); +cJSON* syncPing2Json(const SyncPing* pMsg); SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); - SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); +// --------------------------------------------- typedef struct SyncPingReply { uint32_t bytes; uint32_t msgType; SRaftId srcId; SRaftId destId; + // private data uint32_t dataLen; char data[]; } SyncPingReply; @@ -89,74 +78,117 @@ typedef struct SyncPingReply { (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) SyncPingReply* syncPingReplyBuild(uint32_t dataLen); - -void syncPingReplyDestroy(SyncPingReply* pMsg); - -void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); - -void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); - -void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); - -void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); - -cJSON* syncPingReply2Json(const SyncPingReply* pMsg); - +void syncPingReplyDestroy(SyncPingReply* pMsg); +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); - SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); +// --------------------------------------------- typedef struct SyncClientRequest { - ESyncMessageType msgType; - char* data; - uint32_t dataLen; - int64_t seqNum; - bool isWeak; + uint32_t bytes; + uint32_t msgType; + int64_t seqNum; + bool isWeak; + uint32_t dataLen; + char data[]; } SyncClientRequest; +// --------------------------------------------- typedef struct SyncClientRequestReply { - ESyncMessageType msgType; - int32_t errCode; - SSyncBuffer* pErrMsg; - SSyncBuffer* pLeaderHint; + uint32_t bytes; + uint32_t msgType; + int32_t errCode; + SRaftId leaderHint; } SyncClientRequestReply; +// --------------------------------------------- typedef struct SyncRequestVote { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - SyncGroupId vgId; - SyncIndex lastLogIndex; - SyncTerm lastLogTerm; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm currentTerm; + SyncIndex lastLogIndex; + SyncTerm lastLogTerm; } SyncRequestVote; +SyncRequestVote* syncRequestVoteBuild(); +void syncRequestVoteDestroy(SyncRequestVote* pMsg); +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); + +// --------------------------------------------- typedef struct SyncRequestVoteReply { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - SyncGroupId vgId; - bool voteGranted; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncTerm term; + bool voteGranted; } SyncRequestVoteReply; +SyncRequestVoteReply* SyncRequestVoteReplyBuild(); +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); + +// --------------------------------------------- typedef struct SyncAppendEntries { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - SyncIndex prevLogIndex; - SyncTerm prevLogTerm; - int32_t entryCount; - SSyncRaftEntry* logEntries; - SyncIndex commitIndex; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; + SyncIndex commitIndex; + uint32_t dataLen; + char data[]; } SyncAppendEntries; +#define SYNC_APPEND_ENTRIES_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \ + sizeof(SyncIndex) + sizeof(uint32_t)) + +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen); +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); + +// --------------------------------------------- typedef struct SyncAppendEntriesReply { - ESyncMessageType msgType; - SyncTerm currentTerm; - SyncNodeId nodeId; - bool success; - SyncIndex matchIndex; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + // private data + bool success; + SyncIndex matchIndex; } SyncAppendEntriesReply; +SyncAppendEntriesReply* syncAppendEntriesReplyBuild(); +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 8d04b5bb26..3ba145a96b 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -211,23 +211,17 @@ static void *syncIOConsumerFunc(void *param) { if (pRpcMsg->msgType == SYNC_PING) { if (io->FpOnSyncPing != NULL) { SyncPing *pSyncMsg; - - SRpcMsg tmpRpcMsg; - memcpy(&tmpRpcMsg, pRpcMsg, sizeof(SRpcMsg)); - pSyncMsg = syncPingBuild(tmpRpcMsg.contLen); - + pSyncMsg = syncPingBuild(pRpcMsg->contLen); syncPingFromRpcMsg(pRpcMsg, pSyncMsg); - // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); - io->FpOnSyncPing(io->pSyncNode, pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { - SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); - syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); - if (io->FpOnSyncPingReply != NULL) { + SyncPingReply *pSyncMsg; + pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); + syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); } } else { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 49fac038da..9cb3a61fff 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -171,16 +171,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn SRpcMsg rpcMsg; syncPing2RpcMsg(pMsg, &rpcMsg); - - /* - SRpcMsg rpcMsg; - rpcMsg.contLen = 64; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "xxxxxxxxxxxxxx"); - rpcMsg.handle = NULL; - rpcMsg.msgType = 1; - */ - syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index a26c8401b2..4c44b4691c 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -60,12 +60,15 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { } cJSON* syncPing2Json(const SyncPing* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; @@ -79,7 +82,8 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; @@ -154,12 +158,15 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { } cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); { uint64_t u64 = pMsg->srcId.addr; cJSON* pTmp = pSrcId; @@ -173,7 +180,8 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); { uint64_t u64 = pMsg->destId.addr; cJSON* pTmp = pDestId; @@ -208,72 +216,345 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) return pMsg; } -#if 0 -void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) { - *bufLen = sizeof(SyncPing) + pMsg->dataLen; - *ppBuf = (char*)malloc(*bufLen); - void* pStart = *ppBuf; - uint32_t allBytes = *bufLen; - - int len = 0; - len = taosEncodeFixedU32(&pStart, pMsg->msgType); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedU64(&pStart, pMsg->destId.addr); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId); - allBytes -= len; - assert(len > 0); - pStart += len; - - len = taosEncodeFixedU32(&pStart, pMsg->dataLen); - allBytes -= len; - assert(len > 0); - pStart += len; - - memcpy(pStart, pMsg->data, pMsg->dataLen); - allBytes -= pMsg->dataLen; - assert(allBytes == 0); +// ---- message process SyncRequestVote---- +SyncRequestVote* syncRequestVoteBuild() { + uint32_t bytes = sizeof(SyncRequestVote); + SyncRequestVote* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_REQUEST_VOTE; } - -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { - void* pStart = (void*)buf; - uint64_t u64; - int32_t i32; - uint32_t u32; - - pStart = taosDecodeFixedU64(pStart, &u64); - pMsg->msgType = u64; - - pStart = taosDecodeFixedU64(pStart, &u64); - pMsg->srcId.addr = u64; - - pStart = taosDecodeFixedI32(pStart, &i32); - pMsg->srcId.vgId = i32; - - pStart = taosDecodeFixedU64(pStart, &u64); - pMsg->destId.addr = u64; - - pStart = taosDecodeFixedI32(pStart, &i32); - pMsg->destId.vgId = i32; - - pStart = taosDecodeFixedU32(pStart, &u32); - pMsg->dataLen = u32; +void syncRequestVoteDestroy(SyncRequestVote* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } } -#endif \ No newline at end of file + +void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { + syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->currentTerm); + cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); + cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); + cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot); + return pJson; +} + +// ---- message process SyncRequestVoteReply---- +SyncRequestVoteReply* SyncRequestVoteReplyBuild() { + uint32_t bytes = sizeof(SyncRequestVoteReply); + SyncRequestVoteReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_REQUEST_VOTE_REPLY; +} + +void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncRequestVoteReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg) { + syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "vote_granted", pMsg->voteGranted); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncRequestVoteReply", pRoot); + return pJson; +} + +// ---- message process SyncAppendEntries---- +SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_APPEND_ENTRIES_FIX_LEN + dataLen; + SyncAppendEntries* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_APPEND_ENTRIES; + pMsg->dataLen = dataLen; +} + +void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_APPEND_ENTRIES_FIX_LEN + pMsg->dataLen); +} + +void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg) { + syncAppendEntriesDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); + cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); + cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); + cJSON_AddStringToObject(pRoot, "commit_index", u64buf); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); + return pJson; +} + +// ---- message process SyncAppendEntriesReply---- +SyncAppendEntriesReply* syncAppendEntriesReplyBuild() { + uint32_t bytes = sizeof(SyncAppendEntriesReply); + SyncAppendEntriesReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY; +} + +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) { + syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "success", pMsg->success); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); + cJSON_AddStringToObject(pRoot, "match_index", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); + return pJson; +} \ No newline at end of file diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 4deceb603e..6197621051 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -3,6 +3,7 @@ #include "syncIO.h" #include "syncInt.h" #include "syncMessage.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -14,6 +15,7 @@ void logTest() { } #define PING_MSG_LEN 20 +#define APPEND_ENTRIES_VALUE_LEN 32 void test1() { sTrace("test1: ---- syncPingSerialize, syncPingDeserialize"); @@ -21,16 +23,16 @@ void test1() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "test ping"); SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); - pMsg->srcId.addr = 1; - pMsg->srcId.vgId = 2; - pMsg->destId.addr = 3; - pMsg->destId.vgId = 4; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPing: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -45,7 +47,7 @@ void test1() { { cJSON* pJson = syncPing2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPing2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -61,16 +63,16 @@ void test2() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "hello raft"); SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); - pMsg->srcId.addr = 100; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 3333); pMsg->srcId.vgId = 200; - pMsg->destId.addr = 300; - pMsg->destId.vgId = 400; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 4444); + pMsg->destId.vgId = 200; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPing: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -84,7 +86,7 @@ void test2() { { cJSON* pJson = syncPing2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPing2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -99,16 +101,16 @@ void test3() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "test ping"); SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); - pMsg->srcId.addr = 19; - pMsg->srcId.vgId = 29; - pMsg->destId.addr = 39; - pMsg->destId.vgId = 49; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 5555); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 6666); + pMsg->destId.vgId = 100; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -123,7 +125,7 @@ void test3() { { cJSON* pJson = syncPingReply2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -139,16 +141,16 @@ void test4() { char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "hello raft"); SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); - pMsg->srcId.addr = 66; - pMsg->srcId.vgId = 77; - pMsg->destId.addr = 88; - pMsg->destId.vgId = 99; + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 7777); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 8888); + pMsg->destId.vgId = 100; memcpy(pMsg->data, msg, PING_MSG_LEN); { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -162,7 +164,7 @@ void test4() { { cJSON* pJson = syncPingReply2Json(pMsg2); char* serialized = cJSON_Print(pJson); - printf("SyncPingReply2: \n%s\n\n", serialized); + printf("\n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } @@ -170,6 +172,321 @@ void test4() { syncPingReplyDestroy(pMsg); syncPingReplyDestroy(pMsg2); } + +void test5() { + sTrace("test5: ---- syncRequestVoteSerialize, syncRequestVoteDeserialize"); + + SyncRequestVote* pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 20; + pMsg->lastLogIndex = 21; + pMsg->lastLogTerm = 22; + + { + cJSON* pJson = syncRequestVote2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncRequestVoteSerialize(pMsg, buf, bufLen); + + SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes); + syncRequestVoteDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncRequestVote2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); + free(buf); +} + +void test6() { + sTrace("test6: ---- syncRequestVote2RpcMsg, syncRequestVoteFromRpcMsg"); + + SyncRequestVote* pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 20; + pMsg->lastLogIndex = 21; + pMsg->lastLogTerm = 22; + + { + cJSON* pJson = syncRequestVote2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes); + syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncRequestVote2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); +} + +void test7() { + sTrace("test7: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"); + + SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 20; + pMsg->voteGranted = 1; + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncRequestVoteReplySerialize(pMsg, buf, bufLen); + + SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes); + syncRequestVoteReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); + free(buf); +} + +void test8() { + sTrace("test8: ---- syncRequestVoteReply2RpcMsg, syncRequestVoteReplyFromRpcMsg"); + + SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 20; + pMsg->voteGranted = 1; + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes); + syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncRequestVoteReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test9() { + sTrace("test9: ---- syncAppendEntriesSerialize, syncAppendEntriesDeserialize"); + + char msg[APPEND_ENTRIES_VALUE_LEN]; + snprintf(msg, sizeof(msg), "%s", "test value"); + SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 55; + pMsg->prevLogTerm = 66; + pMsg->commitIndex = 77; + memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncAppendEntriesSerialize(pMsg, buf, bufLen); + + SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes); + syncAppendEntriesDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); + free(buf); +} + +void test10() { + sTrace("test10: ---- syncAppendEntries2RpcMsg, syncAppendEntriesFromRpcMsg"); + + char msg[APPEND_ENTRIES_VALUE_LEN]; + snprintf(msg, sizeof(msg), "%s", "test value"); + SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 55; + pMsg->prevLogTerm = 66; + pMsg->commitIndex = 77; + memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes); + syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncAppendEntries2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test11() { + sTrace("test11: ---- syncAppendEntriesReplySerialize, syncAppendEntriesReplyDeserialize"); + + SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->success = 1; + pMsg->matchIndex = 23; + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncAppendEntriesReplySerialize(pMsg, buf, bufLen); + + SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes); + syncAppendEntriesReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); + free(buf); +} + +void test12() { + sTrace("test12: ---- syncAppendEntriesReply2RpcMsg, syncAppendEntriesReplyFromRpcMsg"); + + SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); + pMsg->destId.vgId = 100; + pMsg->success = 1; + pMsg->matchIndex = 23; + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes); + syncAppendEntriesReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("\n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); +} + int main() { // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; @@ -179,6 +496,14 @@ int main() { test2(); test3(); test4(); + test5(); + test6(); + test7(); + test8(); + test9(); + test10(); + test11(); + test12(); return 0; } diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 24d9ead5e3..8268128347 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -13,7 +13,9 @@ void logTest() { sFatal("--- sync log test: fatal"); } -SSyncNode* doSync() { +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* doSync(int myIndex) { SSyncFSM* pFsm; SSyncInfo syncInfo; @@ -24,18 +26,18 @@ SSyncNode* doSync() { snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); SSyncCfg* pCfg = &syncInfo.syncCfg; - pCfg->myIndex = 0; - pCfg->replicaNum = 2; + pCfg->myIndex = myIndex; + pCfg->replicaNum = 3; - pCfg->nodeInfo[0].nodePort = 7010; + pCfg->nodeInfo[0].nodePort = ports[0]; snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - pCfg->nodeInfo[1].nodePort = 7110; + pCfg->nodeInfo[1].nodePort = ports[1]; snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - pCfg->nodeInfo[2].nodePort = 7210; + pCfg->nodeInfo[2].nodePort = ports[2]; snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); @@ -53,20 +55,25 @@ void timerPingAll(void* param, void* tmrId) { syncNodePingAll(pSyncNode); } -int main() { +int main(int argc, char** argv) { // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; logTest(); - int32_t ret = syncIOStart((char*)"127.0.0.1", 7010); + int myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); - SSyncNode* pSyncNode = doSync(); + SSyncNode* pSyncNode = doSync(myIndex); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; @@ -74,9 +81,9 @@ int main() { assert(ret == 0); /* - taosMsleep(10000); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); */ while (1) {