diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 8578d242dd..cc4b8d39d4 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -186,13 +186,23 @@ typedef struct SyncRequestVoteReply { bool voteGranted; } SyncRequestVoteReply; -SyncRequestVoteReply* SyncRequestVoteReplyBuild(); +SyncRequestVoteReply* syncRequestVoteReplyBuild(); void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len); +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len); void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg); + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); // --------------------------------------------- typedef struct SyncAppendEntries { diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 9d2afb03ca..18ce13a585 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -263,7 +263,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg; - pSyncMsg = SyncRequestVoteReplyBuild(); + pSyncMsg = syncRequestVoteReplyBuild(); syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3b95835e07..5bd5321ec6 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -472,6 +472,7 @@ void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; } cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { @@ -557,7 +558,7 @@ void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) { } // ---- message process SyncRequestVoteReply---- -SyncRequestVoteReply* SyncRequestVoteReplyBuild() { +SyncRequestVoteReply* syncRequestVoteReplyBuild() { uint32_t bytes = sizeof(SyncRequestVoteReply); SyncRequestVoteReply* pMsg = malloc(bytes); memset(pMsg, 0, bytes); @@ -582,6 +583,25 @@ void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestV assert(len == pMsg->bytes); } +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncRequestVoteReply* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncRequestVoteReplyDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -594,6 +614,11 @@ void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { char u64buf[128]; @@ -639,6 +664,40 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { return pJson; } +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + printf("syncRequestVoteReplyPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + printf("syncRequestVoteReplyPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + sTrace("syncRequestVoteReplyLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + sTrace("syncRequestVoteReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ---- message process SyncAppendEntries---- SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { uint32_t bytes = SYNC_APPEND_ENTRIES_FIX_LEN + dataLen; diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index e442dec890..71d2c5c3c9 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -18,6 +18,7 @@ add_executable(syncIndexMgrTest "") add_executable(syncLogStoreTest "") add_executable(syncEntryTest "") add_executable(syncRequestVote "") +add_executable(syncRequestVoteReply "") target_sources(syncTest @@ -100,6 +101,10 @@ target_sources(syncRequestVote PRIVATE "syncRequestVote.cpp" ) +target_sources(syncRequestVoteReply + PRIVATE + "syncRequestVoteReply.cpp" +) target_include_directories(syncTest @@ -202,6 +207,11 @@ target_include_directories(syncRequestVote "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncRequestVoteReply + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -284,6 +294,10 @@ target_link_libraries(syncRequestVote sync gtest_main ) +target_link_libraries(syncRequestVoteReply + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncRequestVote.cpp b/source/libs/sync/test/syncRequestVote.cpp index 31fb5a5307..e46eb87fe9 100644 --- a/source/libs/sync/test/syncRequestVote.cpp +++ b/source/libs/sync/test/syncRequestVote.cpp @@ -38,7 +38,7 @@ void test2() { syncRequestVoteSerialize(pMsg, serialized, len); SyncRequestVote *pMsg2 = syncRequestVoteBuild(); syncRequestVoteDeserialize(serialized, len, pMsg2); - syncRequestVotePrint2((char *)"test2: syncRequestVoteSerialize -> syncRequestVoteDeserialize ", pMsg); + syncRequestVotePrint2((char *)"test2: syncRequestVoteSerialize -> syncRequestVoteDeserialize ", pMsg2); free(serialized); syncRequestVoteDestroy(pMsg); @@ -50,7 +50,7 @@ void test3() { uint32_t len; char * serialized = syncRequestVoteSerialize2(pMsg, &len); SyncRequestVote *pMsg2 = syncRequestVoteDeserialize2(serialized, len); - syncRequestVotePrint2((char *)"test3: syncRequestVoteSerialize3 -> syncRequestVoteDeserialize2 ", pMsg); + syncRequestVotePrint2((char *)"test3: syncRequestVoteSerialize3 -> syncRequestVoteDeserialize2 ", pMsg2); free(serialized); syncRequestVoteDestroy(pMsg); @@ -63,7 +63,7 @@ void test4() { syncRequestVote2RpcMsg(pMsg, &rpcMsg); SyncRequestVote *pMsg2 = syncRequestVoteBuild(); syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2); - syncRequestVotePrint2((char *)"test4: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg ", pMsg); + syncRequestVotePrint2((char *)"test4: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg ", pMsg2); syncRequestVoteDestroy(pMsg); syncRequestVoteDestroy(pMsg2); @@ -74,7 +74,7 @@ void test5() { SRpcMsg rpcMsg; syncRequestVote2RpcMsg(pMsg, &rpcMsg); SyncRequestVote *pMsg2 = syncRequestVoteFromRpcMsg2(&rpcMsg); - syncRequestVotePrint2((char *)"test5: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg2 ", pMsg); + syncRequestVotePrint2((char *)"test5: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg2 ", pMsg2); syncRequestVoteDestroy(pMsg); syncRequestVoteDestroy(pMsg2); diff --git a/source/libs/sync/test/syncRequestVoteReply.cpp b/source/libs/sync/test/syncRequestVoteReply.cpp new file mode 100644 index 0000000000..37c00fac81 --- /dev/null +++ b/source/libs/sync/test/syncRequestVoteReply.cpp @@ -0,0 +1,95 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncRequestVoteReply *createMsg() { + SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 77; + pMsg->voteGranted = true; +} + +void test1() { + SyncRequestVoteReply *pMsg = createMsg(); + syncRequestVoteReplyPrint2((char *)"test1:", pMsg); + syncRequestVoteReplyDestroy(pMsg); +} + +void test2() { + SyncRequestVoteReply *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncRequestVoteReplySerialize(pMsg, serialized, len); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyBuild(); + syncRequestVoteReplyDeserialize(serialized, len, pMsg2); + syncRequestVoteReplyPrint2((char *)"test2: syncRequestVoteReplySerialize -> syncRequestVoteReplyDeserialize ", pMsg2); + + free(serialized); + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test3() { + SyncRequestVoteReply *pMsg = createMsg(); + uint32_t len; + char * serialized = syncRequestVoteReplySerialize2(pMsg, &len); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyDeserialize2(serialized, len); + syncRequestVoteReplyPrint2((char *)"test3: syncRequestVoteReplySerialize3 -> syncRequestVoteReplyDeserialize2 ", pMsg2); + + free(serialized); + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test4() { + SyncRequestVoteReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyBuild(); + syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2); + syncRequestVoteReplyPrint2((char *)"test4: syncRequestVoteReply2RpcMsg -> syncRequestVoteReplyFromRpcMsg ", pMsg2); + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test5() { + SyncRequestVoteReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyFromRpcMsg2(&rpcMsg); + syncRequestVoteReplyPrint2((char *)"test5: syncRequestVoteReply2RpcMsg -> syncRequestVoteReplyFromRpcMsg2 ", pMsg2); + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 504fa3034a..588eb32ffd 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -118,7 +118,7 @@ int main(int argc, char** argv) { } for (int i = 0; i < replicaNum; ++i) { - SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild(); + SyncRequestVoteReply* reply = syncRequestVoteReplyBuild(); reply->destId = pSyncNode->myRaftId; reply->srcId = ids[i]; reply->term = term; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 0b6abef212..76fd6fab4e 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -118,7 +118,7 @@ int main(int argc, char** argv) { } for (int i = 0; i < replicaNum; ++i) { - SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild(); + SyncRequestVoteReply* reply = syncRequestVoteReplyBuild(); reply->destId = pSyncNode->myRaftId; reply->srcId = ids[i]; reply->term = term;