From 12c202aa316ad0f7288060b25a49047f676f76ee Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 1 Mar 2022 18:35:07 +0800 Subject: [PATCH] sync encode test --- source/libs/sync/inc/syncMessage.h | 46 +++++-- source/libs/sync/src/syncMessage.c | 166 +++++++++++++++-------- source/libs/sync/test/syncEncodeTest.cpp | 137 +++++++++++++++---- 3 files changed, 251 insertions(+), 98 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 121314c589..a7de7b9019 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -58,6 +58,20 @@ typedef struct SyncPing { #define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) +SyncPing* syncPingBuild(uint32_t dataLen); + +void syncPingDestroy(SyncPing* pMsg); + +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); + +cJSON* syncPing2Json(const SyncPing* pMsg); + typedef struct SyncPingReply { uint32_t bytes; uint32_t msgType; @@ -67,6 +81,23 @@ typedef struct SyncPingReply { char data[]; } SyncPingReply; +#define SYNC_PING_REPLY_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) + +SyncPingReply* syncPingReplyBuild(uint32_t dataLen); + +void syncPingReplyDestroy(SyncPingReply* pMsg); + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); + typedef struct SyncClientRequest { ESyncMessageType msgType; char* data; @@ -118,21 +149,6 @@ typedef struct SyncAppendEntriesReply { SyncIndex matchIndex; } SyncAppendEntriesReply; -// ---- message build ---- -SyncPing* syncPingBuild(uint32_t dataLen); - -void syncPingDestroy(SyncPing* pSyncPing); - -void syncPingSerialize(const SyncPing* pSyncPing, char* buf, uint32_t bufLen); - -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing); - -void syncPing2RpcMsg(const SyncPing* pSyncPing, SRpcMsg* pRpcMsg); - -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pSyncPing); - -cJSON* syncPing2Json(const SyncPing* pSyncPing); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 80dd184018..3122aa66bb 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -19,136 +19,194 @@ void onMessage(SRaft* pRaft, void* pMsg) {} -// ---- message build ---- +// ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; - SyncPing* pSyncPing = malloc(bytes); - memset(pSyncPing, 0, bytes); - pSyncPing->bytes = bytes; - pSyncPing->msgType = SYNC_PING; - pSyncPing->dataLen = dataLen; + SyncPing* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING; + pMsg->dataLen = dataLen; } -void syncPingDestroy(SyncPing* pSyncPing) { - if (pSyncPing != NULL) { - free(pSyncPing); +void syncPingDestroy(SyncPing* pMsg) { + if (pMsg != NULL) { + free(pMsg); } } -void syncPingSerialize(const SyncPing* pSyncPing, char* buf, uint32_t bufLen) { - assert(pSyncPing->bytes <= bufLen); - memcpy(buf, pSyncPing, pSyncPing->bytes); +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); } -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing) { - /* - uint32_t* pU32 = (uint32_t*)buf; - uint32_t bytes = *pU32; - pSyncPing = (SyncPing*)malloc(bytes); - */ - memcpy(pSyncPing, buf, len); - assert(len == pSyncPing->bytes); - assert(pSyncPing->bytes == SYNC_PING_FIX_LEN + pSyncPing->dataLen); +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); } -void syncPing2RpcMsg(const SyncPing* pSyncPing, SRpcMsg* pRpcMsg) { - pRpcMsg->msgType = pSyncPing->msgType; - pRpcMsg->contLen = pSyncPing->bytes; +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncPingSerialize(pSyncPing, pRpcMsg->pCont, pRpcMsg->contLen); + syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); } -void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pSyncPing) { - syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pSyncPing); +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { + syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } -cJSON* syncPing2Json(const SyncPing* pSyncPing) { +cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON* pRoot = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRoot, "bytes", pSyncPing->bytes); - cJSON_AddNumberToObject(pRoot, "msgType", pSyncPing->msgType); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); cJSON* pSrcId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pSrcId, "addr", pSyncPing->srcId.addr); - cJSON_AddNumberToObject(pSrcId, "vgId", pSyncPing->srcId.vgId); + cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pSrcId); cJSON* pDestId = cJSON_CreateObject(); - cJSON_AddNumberToObject(pDestId, "addr", pSyncPing->destId.addr); - cJSON_AddNumberToObject(pDestId, "vgId", pSyncPing->destId.vgId); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); cJSON_AddItemToObject(pRoot, "srcId", pDestId); - cJSON_AddNumberToObject(pRoot, "dataLen", pSyncPing->dataLen); - cJSON_AddStringToObject(pRoot, "data", pSyncPing->data); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); - return pRoot; + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + +// ---- message process SyncPingReply---- +SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen; + SyncPingReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING; + pMsg->dataLen = dataLen; +} + +void syncPingReplyDestroy(SyncPingReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); +} + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { + syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); + return pJson; } #if 0 -void syncPingSerialize(const SyncPing* pSyncPing, char** ppBuf, uint32_t* bufLen) { - *bufLen = sizeof(SyncPing) + pSyncPing->dataLen; +void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) { + *bufLen = sizeof(SyncPing) + pMsg->dataLen; *ppBuf = (char*)malloc(*bufLen); void* pStart = *ppBuf; uint32_t allBytes = *bufLen; int len = 0; - len = taosEncodeFixedU32(&pStart, pSyncPing->msgType); + len = taosEncodeFixedU32(&pStart, pMsg->msgType); allBytes -= len; assert(len > 0); pStart += len; - len = taosEncodeFixedU64(&pStart, pSyncPing->srcId.addr); + len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr); allBytes -= len; assert(len > 0); pStart += len; - len = taosEncodeFixedI32(&pStart, pSyncPing->srcId.vgId); + len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId); allBytes -= len; assert(len > 0); pStart += len; - len = taosEncodeFixedU64(&pStart, pSyncPing->destId.addr); + len = taosEncodeFixedU64(&pStart, pMsg->destId.addr); allBytes -= len; assert(len > 0); pStart += len; - len = taosEncodeFixedI32(&pStart, pSyncPing->destId.vgId); + len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId); allBytes -= len; assert(len > 0); pStart += len; - len = taosEncodeFixedU32(&pStart, pSyncPing->dataLen); + len = taosEncodeFixedU32(&pStart, pMsg->dataLen); allBytes -= len; assert(len > 0); pStart += len; - memcpy(pStart, pSyncPing->data, pSyncPing->dataLen); - allBytes -= pSyncPing->dataLen; + memcpy(pStart, pMsg->data, pMsg->dataLen); + allBytes -= pMsg->dataLen; assert(allBytes == 0); } -void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pSyncPing) { +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { void* pStart = (void*)buf; uint64_t u64; int32_t i32; uint32_t u32; pStart = taosDecodeFixedU64(pStart, &u64); - pSyncPing->msgType = u64; + pMsg->msgType = u64; pStart = taosDecodeFixedU64(pStart, &u64); - pSyncPing->srcId.addr = u64; + pMsg->srcId.addr = u64; pStart = taosDecodeFixedI32(pStart, &i32); - pSyncPing->srcId.vgId = i32; + pMsg->srcId.vgId = i32; pStart = taosDecodeFixedU64(pStart, &u64); - pSyncPing->destId.addr = u64; + pMsg->destId.addr = u64; pStart = taosDecodeFixedI32(pStart, &i32); - pSyncPing->destId.vgId = i32; + pMsg->destId.vgId = i32; pStart = taosDecodeFixedU32(pStart, &u32); - pSyncPing->dataLen = u32; + pMsg->dataLen = u32; } #endif \ No newline at end of file diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 029e2bd496..4deceb603e 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -16,59 +16,59 @@ void logTest() { #define PING_MSG_LEN 20 void test1() { - sTrace("test1: ----"); + sTrace("test1: ---- syncPingSerialize, syncPingDeserialize"); char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "test ping"); - SyncPing* pSyncPing = syncPingBuild(PING_MSG_LEN); - pSyncPing->srcId.addr = 1; - pSyncPing->srcId.vgId = 2; - pSyncPing->destId.addr = 3; - pSyncPing->destId.vgId = 4; - memcpy(pSyncPing->data, msg, PING_MSG_LEN); + SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); + pMsg->srcId.addr = 1; + pMsg->srcId.vgId = 2; + pMsg->destId.addr = 3; + pMsg->destId.vgId = 4; + memcpy(pMsg->data, msg, PING_MSG_LEN); { - cJSON* pJson = syncPing2Json(pSyncPing); + cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); printf("SyncPing: \n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } - uint32_t bufLen = pSyncPing->bytes; + uint32_t bufLen = pMsg->bytes; char* buf = (char*)malloc(bufLen); - syncPingSerialize(pSyncPing, buf, bufLen); + syncPingSerialize(pMsg, buf, bufLen); - SyncPing* pSyncPing2 = (SyncPing*)malloc(pSyncPing->bytes); - syncPingDeserialize(buf, bufLen, pSyncPing2); + SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); + syncPingDeserialize(buf, bufLen, pMsg2); { - cJSON* pJson = syncPing2Json(pSyncPing2); + cJSON* pJson = syncPing2Json(pMsg2); char* serialized = cJSON_Print(pJson); printf("SyncPing2: \n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } - syncPingDestroy(pSyncPing); - syncPingDestroy(pSyncPing2); + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); free(buf); } void test2() { - sTrace("test2: ----"); + sTrace("test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg"); char msg[PING_MSG_LEN]; snprintf(msg, sizeof(msg), "%s", "hello raft"); - SyncPing* pSyncPing = syncPingBuild(PING_MSG_LEN); - pSyncPing->srcId.addr = 100; - pSyncPing->srcId.vgId = 200; - pSyncPing->destId.addr = 300; - pSyncPing->destId.vgId = 400; - memcpy(pSyncPing->data, msg, PING_MSG_LEN); + SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); + pMsg->srcId.addr = 100; + pMsg->srcId.vgId = 200; + pMsg->destId.addr = 300; + pMsg->destId.vgId = 400; + memcpy(pMsg->data, msg, PING_MSG_LEN); { - cJSON* pJson = syncPing2Json(pSyncPing); + cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); printf("SyncPing: \n%s\n\n", serialized); free(serialized); @@ -76,23 +76,100 @@ void test2() { } SRpcMsg rpcMsg; - syncPing2RpcMsg(pSyncPing, &rpcMsg); - SyncPing* pSyncPing2 = (SyncPing*)malloc(pSyncPing->bytes); - syncPingFromRpcMsg(&rpcMsg, pSyncPing2); + syncPing2RpcMsg(pMsg, &rpcMsg); + SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); + syncPingFromRpcMsg(&rpcMsg, pMsg2); rpcFreeCont(rpcMsg.pCont); { - cJSON* pJson = syncPing2Json(pSyncPing2); + cJSON* pJson = syncPing2Json(pMsg2); char* serialized = cJSON_Print(pJson); printf("SyncPing2: \n%s\n\n", serialized); free(serialized); cJSON_Delete(pJson); } - syncPingDestroy(pSyncPing); - syncPingDestroy(pSyncPing2); + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); } +void test3() { + sTrace("test3: ---- syncPingReplySerialize, syncPingReplyDeserialize"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "test ping"); + SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); + pMsg->srcId.addr = 19; + pMsg->srcId.vgId = 29; + pMsg->destId.addr = 39; + pMsg->destId.vgId = 49; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncPingReplySerialize(pMsg, buf, bufLen); + + SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); + syncPingReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncPingReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); + free(buf); +} + +void test4() { + sTrace("test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "hello raft"); + SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); + pMsg->srcId.addr = 66; + pMsg->srcId.vgId = 77; + pMsg->destId.addr = 88; + pMsg->destId.vgId = 99; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); + syncPingReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncPingReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} int main() { // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; @@ -100,6 +177,8 @@ int main() { test1(); test2(); + test3(); + test4(); return 0; }