diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index 7116ae9d46..63f24b104f 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -42,6 +42,12 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +// for debug ------------------- +void syncIndexMgrPrint(SSyncIndexMgr *pObj); +void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj); +void syncIndexMgrLog(SSyncIndexMgr *pObj); +void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2932240ec1..df277e2d7e 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -211,11 +211,14 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +cJSON* syncNode2Json(const SSyncNode* pSyncNode); +char* syncNode2Str(const SSyncNode* pSyncNode); -// for debug -cJSON* syncNode2Json(const SSyncNode* pSyncNode); -char* syncNode2Str(const SSyncNode* pSyncNode); -void syncNodePrint(char* s, const SSyncNode* pSyncNode); +// for debug -------------- +void syncNodePrint(SSyncNode* pObj); +void syncNodePrint2(char* s, SSyncNode* pObj); +void syncNodeLog(SSyncNode* pObj); +void syncNodeLog2(char* s, SSyncNode* pObj); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 6231cb8399..2876577410 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -47,6 +47,12 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); +// for debug ---------------------- +void syncRpcMsgPrint(SRpcMsg* pMsg); +void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); +void syncRpcMsgLog(SRpcMsg* pMsg); +void syncRpcMsgLog2(char* s, SRpcMsg* pMsg); + // --------------------------------------------- typedef enum ESyncTimeoutType { SYNC_TIMEOUT_PING = 100, @@ -60,17 +66,27 @@ typedef struct SyncTimeout { ESyncTimeoutType timeoutType; uint64_t logicClock; int32_t timerMS; - void* data; + void* data; // need optimized } SyncTimeout; SyncTimeout* syncTimeoutBuild(); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data); void syncTimeoutDestroy(SyncTimeout* pMsg); void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); // +SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); // void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); // cJSON* syncTimeout2Json(const SyncTimeout* pMsg); -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data); +char* syncTimeout2Str(const SyncTimeout* pMsg); // + +// for debug ---------------------- +void syncTimeoutPrint(const SyncTimeout* pMsg); +void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg); +void syncTimeoutLog(const SyncTimeout* pMsg); +void syncTimeoutLog2(char* s, const SyncTimeout* pMsg); // --------------------------------------------- typedef struct SyncPing { @@ -83,17 +99,25 @@ typedef struct SyncPing { char data[]; } SyncPing; -#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) - SyncPing* syncPingBuild(uint32_t dataLen); +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); void syncPingDestroy(SyncPing* pMsg); void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); +char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len); +SyncPing* syncPingDeserialize2(const char* buf, uint32_t len); void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); +SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncPing2Json(const SyncPing* pMsg); -SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); -SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); +char* syncPing2Str(const SyncPing* pMsg); + +// for debug ---------------------- +void syncPingPrint(const SyncPing* pMsg); +void syncPingPrint2(char* s, const SyncPing* pMsg); +void syncPingLog(const SyncPing* pMsg); +void syncPingLog2(char* s, const SyncPing* pMsg); // --------------------------------------------- typedef struct SyncPingReply { @@ -106,18 +130,25 @@ typedef struct SyncPingReply { char data[]; } SyncPingReply; -#define SYNC_PING_REPLY_FIX_LEN \ - (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) - SyncPingReply* syncPingReplyBuild(uint32_t dataLen); +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); void syncPingReplyDestroy(SyncPingReply* pMsg); void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); +char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); // +SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); // void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); +SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); // cJSON* syncPingReply2Json(const SyncPingReply* pMsg); -SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); -SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); +char* syncPingReply2Str(const SyncPingReply* pMsg); // + +// for debug ---------------------- +void syncPingReplyPrint(const SyncPingReply* pMsg); +void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg); +void syncPingReplyLog(const SyncPingReply* pMsg); +void syncPingReplyLog2(char* s, const SyncPingReply* pMsg); // --------------------------------------------- typedef struct SyncClientRequest { @@ -131,13 +162,23 @@ typedef struct SyncClientRequest { } SyncClientRequest; SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); +char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len); +SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len); void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); +SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); +char* syncClientRequest2Str(const SyncClientRequest* pMsg); + +// for debug ---------------------- +void syncClientRequestPrint(const SyncClientRequest* pMsg); +void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); +void syncClientRequestLog(const SyncClientRequest* pMsg); +void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); // --------------------------------------------- typedef struct SyncClientRequestReply { @@ -163,9 +204,19 @@ SyncRequestVote* syncRequestVoteBuild(); void syncRequestVoteDestroy(SyncRequestVote* pMsg); void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen); void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg); +char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len); +SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len); void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg); void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg); +SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg); +char* syncRequestVote2Str(const SyncRequestVote* pMsg); + +// for debug ---------------------- +void syncRequestVotePrint(const SyncRequestVote* pMsg); +void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg); +void syncRequestVoteLog(const SyncRequestVote* pMsg); +void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg); // --------------------------------------------- typedef struct SyncRequestVoteReply { @@ -178,13 +229,23 @@ typedef struct SyncRequestVoteReply { bool voteGranted; } SyncRequestVoteReply; -SyncRequestVoteReply* SyncRequestVoteReplyBuild(); +SyncRequestVoteReply* syncRequestVoteReplyBuild(); void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg); void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen); void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg); +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len); +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len); void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg); void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg); +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg); +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg); + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg); +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg); // --------------------------------------------- typedef struct SyncAppendEntries { @@ -200,17 +261,23 @@ typedef struct SyncAppendEntries { char data[]; } SyncAppendEntries; -#define SYNC_APPEND_ENTRIES_FIX_LEN \ - (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \ - sizeof(SyncIndex) + sizeof(uint32_t)) - SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen); void syncAppendEntriesDestroy(SyncAppendEntries* pMsg); void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen); void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg); +char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len); +SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len); void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg); void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg); +SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg); +char* syncAppendEntries2Str(const SyncAppendEntries* pMsg); + +// for debug ---------------------- +void syncAppendEntriesPrint(const SyncAppendEntries* pMsg); +void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); +void syncAppendEntriesLog(const SyncAppendEntries* pMsg); +void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); // --------------------------------------------- typedef struct SyncAppendEntriesReply { @@ -227,9 +294,19 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild(); void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); +char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len); +SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len); void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); +SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); +char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg); + +// for debug ---------------------- +void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index be25675db4..6ba27b0d8a 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -46,8 +46,12 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); -void syncEntryPrint(const SSyncRaftEntry* pEntry); -void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry); + +// for debug ---------------------- +void syncEntryPrint(const SSyncRaftEntry* pObj); +void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj); +void syncEntryLog(const SSyncRaftEntry* pObj); +void syncEntryLog2(char* s, const SSyncRaftEntry* pObj); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index d59b3206b5..59b5fa94db 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -32,39 +32,24 @@ typedef struct SSyncLogStoreData { SWal* pWal; } SSyncLogStoreData; -SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); - -void logStoreDestory(SSyncLogStore* pLogStore); - -// append one log entry -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); - -// get one log entry, user need to free pEntry->pCont +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); +void logStoreDestory(SSyncLogStore* pLogStore); +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); - -// truncate log with index, entries after the given index (>=index) will be deleted -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); - -// return index of last entry -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); - -// return term of last entry -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); - -// update log store commit index with "index" -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); - -// return commit index of log -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); - +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); - -cJSON* logStore2Json(SSyncLogStore* pLogStore); - -char* logStore2Str(SSyncLogStore* pLogStore); +cJSON* logStore2Json(SSyncLogStore* pLogStore); +char* logStore2Str(SSyncLogStore* pLogStore); // for debug void logStorePrint(SSyncLogStore* pLogStore); +void logStorePrint2(char* s, SSyncLogStore* pLogStore); +void logStoreLog(SSyncLogStore* pLogStore); +void logStoreLog2(char* s, SSyncLogStore* pLogStore); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 1c25b799b4..4058d3bd1c 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -42,7 +42,12 @@ int32_t raftStoreClose(SRaftStore *pRaftStore); int32_t raftStorePersist(SRaftStore *pRaftStore); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); -void raftStorePrint(SRaftStore *pRaftStore); + +// for debug ------------------- +void raftStorePrint(SRaftStore *pObj); +void raftStorePrint2(char *s, SRaftStore *pObj); +void raftStoreLog(SRaftStore *pObj); +void raftStoreLog2(char *s, SRaftStore *pObj); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index ae9cfe8d01..5bc240e921 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -48,6 +48,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); char * voteGranted2Str(SVotesGranted *pVotesGranted); +// for debug ------------------- +void voteGrantedPrint(SVotesGranted *pObj); +void voteGrantedPrint2(char *s, SVotesGranted *pObj); +void voteGrantedLog(SVotesGranted *pObj); +void voteGrantedLog2(char *s, SVotesGranted *pObj); + // SVotesRespond ----------------------------- typedef struct SVotesRespond { SRaftId (*replicas)[TSDB_MAX_REPLICA]; @@ -65,6 +71,12 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); cJSON * votesRespond2Json(SVotesRespond *pVotesRespond); char * votesRespond2Str(SVotesRespond *pVotesRespond); +// for debug ------------------- +void votesRespondPrint(SVotesRespond *pObj); +void votesRespondPrint2(char *s, SVotesRespond *pObj); +void votesRespondLog(SVotesRespond *pObj); +void votesRespondLog2(char *s, SVotesRespond *pObj); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index d2de22b66c..af97c4663c 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -263,7 +263,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg; - pSyncMsg = SyncRequestVoteReplyBuild(); + pSyncMsg = syncRequestVoteReplyBuild(); syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index fff54638e2..9567938197 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -97,4 +97,31 @@ char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; +} + +// for debug ------------------- +void syncIndexMgrPrint(SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + printf("syncIndexMgrPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + printf("syncIndexMgrPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncIndexMgrLog(SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + sTrace("syncIndexMgrLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) { + char *serialized = syncIndexMgr2Str(pObj); + sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5cd7149b72..860dd96cdf 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -327,11 +327,31 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } -void syncNodePrint(char* s, const SSyncNode* pSyncNode) { - char* ss = syncNode2Str(pSyncNode); - // sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); - fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); - free(ss); +// for debug -------------- +void syncNodePrint(SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncNodePrint2(char* s, SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncNodeLog(SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncNodeLog2(char* s, SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index baef49d748..5a55bbc11f 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -23,39 +23,47 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { // in compiler optimization, switch case = if else constants if (pRpcMsg->msgType == SYNC_TIMEOUT) { - SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; + SyncTimeout* pSyncMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncTimeout2Json(pSyncMsg); + syncTimeoutDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_PING) { - SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; + SyncPing* pSyncMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPing2Json(pSyncMsg); + syncPingDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { - SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; + SyncPingReply* pSyncMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncPingReply2Json(pSyncMsg); + syncPingReplyDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { - SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont; + SyncClientRequest* pSyncMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncClientRequest2Json(pSyncMsg); + syncClientRequestDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { - SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; + SyncRequestVote* pSyncMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVote2Json(pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { - SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; + SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncRequestVoteReply2Json(pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { - SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; + SyncAppendEntries* pSyncMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntries2Json(pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; + SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); pRoot = syncAppendEntriesReply2Json(pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); } else { pRoot = syncRpcUnknownMsg2Json(); @@ -72,7 +80,7 @@ cJSON* syncRpcUnknownMsg2Json() { cJSON_AddStringToObject(pRoot, "data", "known message"); cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + cJSON_AddItemToObject(pJson, "SyncUnknown", pRoot); return pJson; } @@ -83,6 +91,33 @@ char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) { return serialized; } +// for debug ---------------------- +void syncRpcMsgPrint(SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + printf("syncRpcMsgPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + printf("syncRpcMsgPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncRpcMsgLog(SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + sTrace("syncRpcMsgLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) { + char* serialized = syncRpcMsg2Str(pMsg); + sTrace("syncRpcMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ---- message process SyncTimeout---- SyncTimeout* syncTimeoutBuild() { uint32_t bytes = sizeof(SyncTimeout); @@ -93,6 +128,15 @@ SyncTimeout* syncTimeoutBuild() { return pMsg; } +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data) { + SyncTimeout* pMsg = syncTimeoutBuild(); + pMsg->timeoutType = timeoutType; + pMsg->logicClock = logicClock; + pMsg->timerMS = timerMS; + pMsg->data = data; + return pMsg; +} + void syncTimeoutDestroy(SyncTimeout* pMsg) { if (pMsg != NULL) { free(pMsg); @@ -109,6 +153,25 @@ void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { assert(len == pMsg->bytes); } +char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncTimeoutSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncTimeout* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncTimeoutDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -121,6 +184,11 @@ void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { char u64buf[128]; @@ -139,18 +207,43 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { return pJson; } -SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data) { - SyncTimeout* pMsg = syncTimeoutBuild(); - pMsg->timeoutType = timeoutType; - pMsg->logicClock = logicClock; - pMsg->timerMS = timerMS; - pMsg->data = data; - return pMsg; +char* syncTimeout2Str(const SyncTimeout* pMsg) { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncTimeoutPrint(const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + printf("syncTimeoutPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + printf("syncTimeoutPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncTimeoutLog(const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + sTrace("syncTimeoutLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) { + char* serialized = syncTimeout2Str(pMsg); + sTrace("syncTimeoutLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { - uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; + uint32_t bytes = sizeof(SyncPing) + dataLen; SyncPing* pMsg = malloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; @@ -159,6 +252,20 @@ SyncPing* syncPingBuild(uint32_t dataLen) { return pMsg; } +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPing* pMsg = syncPingBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping"); + return pMsg; +} + void syncPingDestroy(SyncPing* pMsg) { if (pMsg != NULL) { free(pMsg); @@ -173,7 +280,26 @@ void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { memcpy(pMsg, buf, len); assert(len == pMsg->bytes); - assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); + assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); +} + +char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncPingSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPing* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncPingDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; } void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { @@ -188,6 +314,11 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncPing2Json(const SyncPing* pMsg) { char u64buf[128]; @@ -226,35 +357,75 @@ cJSON* syncPing2Json(const SyncPing* pMsg) { cJSON_AddItemToObject(pRoot, "destId", pDestId); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", pMsg->data); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncPing", pRoot); return pJson; } -SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { - uint32_t dataLen = strlen(str) + 1; - SyncPing* pMsg = syncPingBuild(dataLen); +char* syncPing2Str(const SyncPing* pMsg) { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncPingPrint(const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + printf("syncPingPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncPingPrint2(char* s, const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + printf("syncPingPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncPingLog(const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + sTrace("syncPingLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncPingLog2(char* s, const SyncPing* pMsg) { + char* serialized = syncPing2Str(pMsg); + sTrace("syncPingLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + +// ---- message process SyncPingReply---- +SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncPingReply) + dataLen; + SyncPingReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING_REPLY; + pMsg->dataLen = dataLen; + return pMsg; +} + +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPingReply* pMsg = syncPingReplyBuild(dataLen); pMsg->srcId = *srcId; pMsg->destId = *destId; snprintf(pMsg->data, pMsg->dataLen, "%s", str); return pMsg; } -SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) { - SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping"); - return pMsg; -} - -// ---- message process SyncPingReply---- -SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { - uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen; - SyncPingReply* pMsg = malloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->msgType = SYNC_PING_REPLY; - pMsg->dataLen = dataLen; +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang"); return pMsg; } @@ -272,7 +443,26 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { memcpy(pMsg, buf, len); assert(len == pMsg->bytes); - assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); + assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); +} + +char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncPingReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncPingReply* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncPingReplyDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; } void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { @@ -287,6 +477,11 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { char u64buf[128]; @@ -325,25 +520,51 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { cJSON_AddItemToObject(pRoot, "destId", pDestId); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", pMsg->data); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); return pJson; } -SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { - uint32_t dataLen = strlen(str) + 1; - SyncPingReply* pMsg = syncPingReplyBuild(dataLen); - pMsg->srcId = *srcId; - pMsg->destId = *destId; - snprintf(pMsg->data, pMsg->dataLen, "%s", str); - return pMsg; +char* syncPingReply2Str(const SyncPingReply* pMsg) { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; } -SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) { - SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang"); - return pMsg; +// for debug ---------------------- +void syncPingReplyPrint(const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + printf("syncPingReplyPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + printf("syncPingReplyPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncPingReplyLog(const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + sTrace("syncPingReplyLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) { + char* serialized = syncPingReply2Str(pMsg); + sTrace("syncPingReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } // ---- message process SyncClientRequest---- @@ -359,6 +580,15 @@ SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { return pMsg; } +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { + SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); + pMsg->originalRpcType = pOriginalRpcMsg->msgType; + pMsg->seqNum = seqNum; + pMsg->isWeak = isWeak; + memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); + return pMsg; +} + void syncClientRequestDestroy(SyncClientRequest* pMsg) { if (pMsg != NULL) { free(pMsg); @@ -375,6 +605,25 @@ void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientReque assert(len == pMsg->bytes); } +char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncClientRequestSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncClientRequest* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncClientRequestDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -387,6 +636,11 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { char u64buf[128]; @@ -399,18 +653,51 @@ cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); + cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); return pJson; } -SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { - SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); - pMsg->originalRpcType = pOriginalRpcMsg->msgType; - pMsg->seqNum = seqNum; - pMsg->isWeak = isWeak; - memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); - return pMsg; +char* syncClientRequest2Str(const SyncClientRequest* pMsg) { + cJSON* pJson = syncClientRequest2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncClientRequestPrint(const SyncClientRequest* pMsg) { + char* serialized = syncClientRequest2Str(pMsg); + printf("syncClientRequestPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg) { + char* serialized = syncClientRequest2Str(pMsg); + printf("syncClientRequestPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncClientRequestLog(const SyncClientRequest* pMsg) { + char* serialized = syncClientRequest2Str(pMsg); + sTrace("syncClientRequestLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { + char* serialized = syncClientRequest2Str(pMsg); + sTrace("syncClientRequestLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } // ---- message process SyncRequestVote---- @@ -439,6 +726,25 @@ void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* assert(len == pMsg->bytes); } +char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncRequestVoteSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncRequestVote* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncRequestVoteDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -451,6 +757,11 @@ void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { char u64buf[128]; @@ -499,8 +810,42 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { return pJson; } +char* syncRequestVote2Str(const SyncRequestVote* pMsg) { + cJSON* pJson = syncRequestVote2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncRequestVotePrint(const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + printf("syncRequestVotePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + printf("syncRequestVotePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncRequestVoteLog(const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + sTrace("syncRequestVoteLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) { + char* serialized = syncRequestVote2Str(pMsg); + sTrace("syncRequestVoteLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ---- message process SyncRequestVoteReply---- -SyncRequestVoteReply* SyncRequestVoteReplyBuild() { +SyncRequestVoteReply* syncRequestVoteReplyBuild() { uint32_t bytes = sizeof(SyncRequestVoteReply); SyncRequestVoteReply* pMsg = malloc(bytes); memset(pMsg, 0, bytes); @@ -525,6 +870,25 @@ void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestV assert(len == pMsg->bytes); } +char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncRequestVoteReply* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncRequestVoteReplyDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -537,6 +901,11 @@ void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply syncRequestVoteReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { char u64buf[128]; @@ -582,9 +951,43 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) { return pJson; } +char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg) { + cJSON* pJson = syncRequestVoteReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + printf("syncRequestVoteReplyPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + printf("syncRequestVoteReplyPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + sTrace("syncRequestVoteReplyLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) { + char* serialized = syncRequestVoteReply2Str(pMsg); + sTrace("syncRequestVoteReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ---- message process SyncAppendEntries---- SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { - uint32_t bytes = SYNC_APPEND_ENTRIES_FIX_LEN + dataLen; + uint32_t bytes = sizeof(SyncAppendEntries) + dataLen; SyncAppendEntries* pMsg = malloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; @@ -607,7 +1010,26 @@ void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32 void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { memcpy(pMsg, buf, len); assert(len == pMsg->bytes); - assert(pMsg->bytes == SYNC_APPEND_ENTRIES_FIX_LEN + pMsg->dataLen); + assert(pMsg->bytes == sizeof(SyncAppendEntries) + pMsg->dataLen); +} + +char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncAppendEntriesSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncAppendEntries* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncAppendEntriesDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; } void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg) { @@ -622,6 +1044,11 @@ void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg syncAppendEntriesDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { char u64buf[128]; @@ -669,13 +1096,53 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { cJSON_AddStringToObject(pRoot, "commit_index", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); - cJSON_AddStringToObject(pRoot, "data", pMsg->data); + char* s; + s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntries", pRoot); return pJson; } +char* syncAppendEntries2Str(const SyncAppendEntries* pMsg) { + cJSON* pJson = syncAppendEntries2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncAppendEntriesPrint(const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + printf("syncAppendEntriesPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + printf("syncAppendEntriesPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncAppendEntriesLog(const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + sTrace("syncAppendEntriesLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { + char* serialized = syncAppendEntries2Str(pMsg); + sTrace("syncAppendEntriesLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ---- message process SyncAppendEntriesReply---- SyncAppendEntriesReply* syncAppendEntriesReplyBuild() { uint32_t bytes = sizeof(SyncAppendEntriesReply); @@ -702,6 +1169,25 @@ void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppend assert(len == pMsg->bytes); } +char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) { + char* buf = malloc(pMsg->bytes); + assert(buf != NULL); + syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncAppendEntriesReply* pMsg = malloc(bytes); + assert(pMsg != NULL); + syncAppendEntriesReplyDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) { memset(pRpcMsg, 0, sizeof(*pRpcMsg)); pRpcMsg->msgType = pMsg->msgType; @@ -714,6 +1200,11 @@ void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesR syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); } +SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + return pMsg; +} + cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { char u64buf[128]; @@ -758,4 +1249,38 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); return pJson; +} + +char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + printf("syncAppendEntriesReplyPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + printf("syncAppendEntriesReplyPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + sTrace("syncAppendEntriesReplyLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + sTrace("syncAppendEntriesReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 959bf49ee7..f29b3022d8 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -104,14 +104,29 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { return serialized; } -void syncEntryPrint(const SSyncRaftEntry* pEntry) { - char* s = syncEntry2Str(pEntry); - sTrace("%s", s); - free(s); +// for debug ---------------------- +void syncEntryPrint(const SSyncRaftEntry* pObj) { + char* serialized = syncEntry2Str(pObj); + printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); } -void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) { - char* ss = syncEntry2Str(pEntry); - sTrace("%s | %s", s, ss); - free(ss); +void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj) { + char* serialized = syncEntry2Str(pObj); + printf("syncEntryPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncEntryLog(const SSyncRaftEntry* pObj) { + char* serialized = syncEntry2Str(pObj); + sTrace("syncEntryLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) { + char* serialized = syncEntry2Str(pObj); + sTrace("syncEntryLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d177d3ac9b..27c8a26154 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -43,7 +43,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) { } } -// append one log entry int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -61,7 +60,6 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { free(serialized); } -// get one log entry, user need to free pEntry->pCont SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -77,14 +75,12 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { return pEntry; } -// truncate log with index, entries after the given index (>=index) will be deleted int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walRollback(pWal, fromIndex); } -// return index of last entry SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -92,7 +88,6 @@ SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { return lastIndex; } -// return term of last entry SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { SyncTerm lastTerm = 0; SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); @@ -103,14 +98,12 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { return lastTerm; } -// update log store commit index with "index" int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walCommit(pWal, index); } -// return commit index of log SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; return pData->pSyncNode->commitIndex; @@ -163,11 +156,29 @@ char* logStore2Str(SSyncLogStore* pLogStore) { return serialized; } -// for debug +// for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { - char* s = logStore2Str(pLogStore); - // sTrace("%s", s); - fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s); + char* serialized = logStore2Str(pLogStore); + printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} - free(s); +void logStorePrint2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStore2Str(pLogStore); + printf("logStorePrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void logStoreLog(SSyncLogStore* pLogStore) { + char* serialized = logStore2Str(pLogStore); + sTrace("logStorePrint | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void logStoreLog2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStore2Str(pLogStore); + sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 7154a21bd1..3a26caa161 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -135,8 +135,30 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; } -void raftStorePrint(SRaftStore *pRaftStore) { - char storeBuf[RAFT_STORE_BLOCK_SIZE]; - raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - printf("%s\n", storeBuf); +// for debug ------------------- +void raftStorePrint(SRaftStore *pObj) { + char serialized[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pObj, serialized, sizeof(serialized)); + printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); +} + +void raftStorePrint2(char *s, SRaftStore *pObj) { + char serialized[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pObj, serialized, sizeof(serialized)); + printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); +} +void raftStoreLog(SRaftStore *pObj) { + char serialized[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pObj, serialized, sizeof(serialized)); + sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized); + fflush(NULL); +} + +void raftStoreLog2(char *s, SRaftStore *pObj) { + char serialized[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pObj, serialized, sizeof(serialized)); + sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + fflush(NULL); } diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index a2f10ce339..5c8e70979c 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -119,6 +119,33 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) { return serialized; } +// for debug ------------------- +void voteGrantedPrint(SVotesGranted *pObj) { + char *serialized = voteGranted2Str(pObj); + printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void voteGrantedPrint2(char *s, SVotesGranted *pObj) { + char *serialized = voteGranted2Str(pObj); + printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void voteGrantedLog(SVotesGranted *pObj) { + char *serialized = voteGranted2Str(pObj); + sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void voteGrantedLog2(char *s, SVotesGranted *pObj) { + char *serialized = voteGranted2Str(pObj); + sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // SVotesRespond ----------------------------- SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); @@ -210,4 +237,31 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) { char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; +} + +// for debug ------------------- +void votesRespondPrint(SVotesRespond *pObj) { + char *serialized = votesRespond2Str(pObj); + printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void votesRespondPrint2(char *s, SVotesRespond *pObj) { + char *serialized = votesRespond2Str(pObj); + printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void votesRespondLog(SVotesRespond *pObj) { + char *serialized = votesRespond2Str(pObj); + sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void votesRespondLog2(char *s, SVotesRespond *pObj) { + char *serialized = votesRespond2Str(pObj); + sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index bfde08ffac..2a47b53945 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -1,7 +1,6 @@ add_executable(syncTest "") add_executable(syncEnvTest "") -add_executable(syncPingTest "") -add_executable(syncEncodeTest "") +add_executable(syncPingTimerTest "") add_executable(syncIOTickQTest "") add_executable(syncIOTickPingTest "") add_executable(syncIOSendMsgTest "") @@ -17,6 +16,15 @@ add_executable(syncVotesRespondTest "") add_executable(syncIndexMgrTest "") add_executable(syncLogStoreTest "") add_executable(syncEntryTest "") +add_executable(syncRequestVoteTest "") +add_executable(syncRequestVoteReplyTest "") +add_executable(syncAppendEntriesTest "") +add_executable(syncAppendEntriesReplyTest "") +add_executable(syncClientRequestTest "") +add_executable(syncTimeoutTest "") +add_executable(syncPingTest "") +add_executable(syncPingReplyTest "") +add_executable(syncRpcMsgTest "") target_sources(syncTest @@ -27,13 +35,9 @@ target_sources(syncEnvTest PRIVATE "syncEnvTest.cpp" ) -target_sources(syncPingTest +target_sources(syncPingTimerTest PRIVATE - "syncPingTest.cpp" -) -target_sources(syncEncodeTest - PRIVATE - "syncEncodeTest.cpp" + "syncPingTimerTest.cpp" ) target_sources(syncIOTickQTest PRIVATE @@ -95,6 +99,42 @@ target_sources(syncEntryTest PRIVATE "syncEntryTest.cpp" ) +target_sources(syncRequestVoteTest + PRIVATE + "syncRequestVoteTest.cpp" +) +target_sources(syncRequestVoteReplyTest + PRIVATE + "syncRequestVoteReplyTest.cpp" +) +target_sources(syncAppendEntriesTest + PRIVATE + "syncAppendEntriesTest.cpp" +) +target_sources(syncAppendEntriesReplyTest + PRIVATE + "syncAppendEntriesReplyTest.cpp" +) +target_sources(syncClientRequestTest + PRIVATE + "syncClientRequestTest.cpp" +) +target_sources(syncTimeoutTest + PRIVATE + "syncTimeoutTest.cpp" +) +target_sources(syncPingTest + PRIVATE + "syncPingTest.cpp" +) +target_sources(syncPingReplyTest + PRIVATE + "syncPingReplyTest.cpp" +) +target_sources(syncRpcMsgTest + PRIVATE + "syncRpcMsgTest.cpp" +) target_include_directories(syncTest @@ -107,12 +147,7 @@ target_include_directories(syncEnvTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) -target_include_directories(syncPingTest - PUBLIC - "${CMAKE_SOURCE_DIR}/include/libs/sync" - "${CMAKE_CURRENT_SOURCE_DIR}/../inc" -) -target_include_directories(syncEncodeTest +target_include_directories(syncPingTimerTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" @@ -192,6 +227,51 @@ target_include_directories(syncEntryTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncRequestVoteTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncRequestVoteReplyTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncAppendEntriesTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncAppendEntriesReplyTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncClientRequestTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncTimeoutTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncPingTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncPingReplyTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncRpcMsgTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -202,11 +282,7 @@ target_link_libraries(syncEnvTest sync gtest_main ) -target_link_libraries(syncPingTest - sync - gtest_main -) -target_link_libraries(syncEncodeTest +target_link_libraries(syncPingTimerTest sync gtest_main ) @@ -270,6 +346,42 @@ target_link_libraries(syncEntryTest sync gtest_main ) +target_link_libraries(syncRequestVoteTest + sync + gtest_main +) +target_link_libraries(syncRequestVoteReplyTest + sync + gtest_main +) +target_link_libraries(syncAppendEntriesTest + sync + gtest_main +) +target_link_libraries(syncAppendEntriesReplyTest + sync + gtest_main +) +target_link_libraries(syncClientRequestTest + sync + gtest_main +) +target_link_libraries(syncTimeoutTest + sync + gtest_main +) +target_link_libraries(syncPingTest + sync + gtest_main +) +target_link_libraries(syncPingReplyTest + sync + gtest_main +) +target_link_libraries(syncRpcMsgTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncAppendEntriesReplyTest.cpp b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp new file mode 100644 index 0000000000..362da67c66 --- /dev/null +++ b/source/libs/sync/test/syncAppendEntriesReplyTest.cpp @@ -0,0 +1,100 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncAppendEntriesReply *createMsg() { + SyncAppendEntriesReply *pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->success = true; + pMsg->matchIndex = 77; + return pMsg; +} + +void test1() { + SyncAppendEntriesReply *pMsg = createMsg(); + syncAppendEntriesReplyPrint2((char *)"test1:", pMsg); + syncAppendEntriesReplyDestroy(pMsg); +} + +void test2() { + SyncAppendEntriesReply *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncAppendEntriesReplySerialize(pMsg, serialized, len); + SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyBuild(); + syncAppendEntriesReplyDeserialize(serialized, len, pMsg2); + syncAppendEntriesReplyPrint2((char *)"test2: syncAppendEntriesReplySerialize -> syncAppendEntriesReplyDeserialize ", + pMsg2); + + free(serialized); + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); +} + +void test3() { + SyncAppendEntriesReply *pMsg = createMsg(); + uint32_t len; + char * serialized = syncAppendEntriesReplySerialize2(pMsg, &len); + SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyDeserialize2(serialized, len); + syncAppendEntriesReplyPrint2((char *)"test3: syncAppendEntriesReplySerialize3 -> syncAppendEntriesReplyDeserialize2 ", + pMsg2); + + free(serialized); + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); +} + +void test4() { + SyncAppendEntriesReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyBuild(); + syncAppendEntriesReplyFromRpcMsg(&rpcMsg, pMsg2); + syncAppendEntriesReplyPrint2((char *)"test4: syncAppendEntriesReply2RpcMsg -> syncAppendEntriesReplyFromRpcMsg ", + pMsg2); + + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); +} + +void test5() { + SyncAppendEntriesReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyFromRpcMsg2(&rpcMsg); + syncAppendEntriesReplyPrint2((char *)"test5: syncAppendEntriesReply2RpcMsg -> syncAppendEntriesReplyFromRpcMsg2 ", + pMsg2); + + syncAppendEntriesReplyDestroy(pMsg); + syncAppendEntriesReplyDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncAppendEntriesTest.cpp b/source/libs/sync/test/syncAppendEntriesTest.cpp new file mode 100644 index 0000000000..687d9bcb94 --- /dev/null +++ b/source/libs/sync/test/syncAppendEntriesTest.cpp @@ -0,0 +1,98 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncAppendEntries *createMsg() { + SyncAppendEntries *pMsg = syncAppendEntriesBuild(20); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 11; + pMsg->prevLogTerm = 22; + pMsg->commitIndex = 33; + strcpy(pMsg->data, "hello world"); + return pMsg; +} + +void test1() { + SyncAppendEntries *pMsg = createMsg(); + syncAppendEntriesPrint2((char *)"test1:", pMsg); + syncAppendEntriesDestroy(pMsg); +} + +void test2() { + SyncAppendEntries *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncAppendEntriesSerialize(pMsg, serialized, len); + SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen); + syncAppendEntriesDeserialize(serialized, len, pMsg2); + syncAppendEntriesPrint2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2); + + free(serialized); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test3() { + SyncAppendEntries *pMsg = createMsg(); + uint32_t len; + char * serialized = syncAppendEntriesSerialize2(pMsg, &len); + SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len); + syncAppendEntriesPrint2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2); + + free(serialized); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test4() { + SyncAppendEntries *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries *pMsg2 = (SyncAppendEntries *)malloc(rpcMsg.contLen); + syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); + syncAppendEntriesPrint2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2); + + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test5() { + SyncAppendEntries *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg); + syncAppendEntriesPrint2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2); + + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncClientRequestTest.cpp b/source/libs/sync/test/syncClientRequestTest.cpp new file mode 100644 index 0000000000..6323f53a03 --- /dev/null +++ b/source/libs/sync/test/syncClientRequestTest.cpp @@ -0,0 +1,96 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncClientRequest *createMsg() { + SRpcMsg rpcMsg; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.msgType = 12345; + rpcMsg.contLen = 20; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + strcpy((char *)rpcMsg.pCont, "hello rpc"); + SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true); + return pMsg; +} + +void test1() { + SyncClientRequest *pMsg = createMsg(); + syncClientRequestPrint2((char *)"test1:", pMsg); + syncClientRequestDestroy(pMsg); +} + +void test2() { + SyncClientRequest *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncClientRequestSerialize(pMsg, serialized, len); + SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen); + syncClientRequestDeserialize(serialized, len, pMsg2); + syncClientRequestPrint2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2); + + free(serialized); + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +void test3() { + SyncClientRequest *pMsg = createMsg(); + uint32_t len; + char * serialized = syncClientRequestSerialize2(pMsg, &len); + SyncClientRequest *pMsg2 = syncClientRequestDeserialize2(serialized, len); + syncClientRequestPrint2((char *)"test3: syncClientRequestSerialize3 -> syncClientRequestDeserialize2 ", pMsg2); + + free(serialized); + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +void test4() { + SyncClientRequest *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + SyncClientRequest *pMsg2 = (SyncClientRequest *)malloc(rpcMsg.contLen); + syncClientRequestFromRpcMsg(&rpcMsg, pMsg2); + syncClientRequestPrint2((char *)"test4: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg ", pMsg2); + + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +void test5() { + SyncClientRequest *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg); + syncClientRequestPrint2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2); + + syncClientRequestDestroy(pMsg); + syncClientRequestDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp deleted file mode 100644 index 6197621051..0000000000 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ /dev/null @@ -1,509 +0,0 @@ -#include -#include "syncEnv.h" -#include "syncIO.h" -#include "syncInt.h" -#include "syncMessage.h" -#include "syncUtil.h" - -void logTest() { - sTrace("--- sync log test: trace"); - sDebug("--- sync log test: debug"); - sInfo("--- sync log test: info"); - sWarn("--- sync log test: warn"); - sError("--- sync log test: error"); - sFatal("--- sync log test: fatal"); -} - -#define PING_MSG_LEN 20 -#define APPEND_ENTRIES_VALUE_LEN 32 - -void test1() { - sTrace("test1: ---- syncPingSerialize, syncPingDeserialize"); - - char msg[PING_MSG_LEN]; - snprintf(msg, sizeof(msg), "%s", "test ping"); - SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); - pMsg->destId.vgId = 100; - memcpy(pMsg->data, msg, PING_MSG_LEN); - - { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - uint32_t bufLen = pMsg->bytes; - char* buf = (char*)malloc(bufLen); - syncPingSerialize(pMsg, buf, bufLen); - - SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); - syncPingDeserialize(buf, bufLen, pMsg2); - - { - cJSON* pJson = syncPing2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncPingDestroy(pMsg); - syncPingDestroy(pMsg2); - free(buf); -} - -void test2() { - sTrace("test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg"); - - char msg[PING_MSG_LEN]; - snprintf(msg, sizeof(msg), "%s", "hello raft"); - SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 3333); - pMsg->srcId.vgId = 200; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 4444); - pMsg->destId.vgId = 200; - memcpy(pMsg->data, msg, PING_MSG_LEN); - - { - cJSON* pJson = syncPing2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SRpcMsg rpcMsg; - syncPing2RpcMsg(pMsg, &rpcMsg); - SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); - syncPingFromRpcMsg(&rpcMsg, pMsg2); - rpcFreeCont(rpcMsg.pCont); - - { - cJSON* pJson = syncPing2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncPingDestroy(pMsg); - syncPingDestroy(pMsg2); -} - -void test3() { - sTrace("test3: ---- syncPingReplySerialize, syncPingReplyDeserialize"); - - char msg[PING_MSG_LEN]; - snprintf(msg, sizeof(msg), "%s", "test ping"); - SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 5555); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 6666); - pMsg->destId.vgId = 100; - memcpy(pMsg->data, msg, PING_MSG_LEN); - - { - cJSON* pJson = syncPingReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - uint32_t bufLen = pMsg->bytes; - char* buf = (char*)malloc(bufLen); - syncPingReplySerialize(pMsg, buf, bufLen); - - SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); - syncPingReplyDeserialize(buf, bufLen, pMsg2); - - { - cJSON* pJson = syncPingReply2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncPingReplyDestroy(pMsg); - syncPingReplyDestroy(pMsg2); - free(buf); -} - -void test4() { - sTrace("test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg"); - - char msg[PING_MSG_LEN]; - snprintf(msg, sizeof(msg), "%s", "hello raft"); - SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 7777); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 8888); - pMsg->destId.vgId = 100; - memcpy(pMsg->data, msg, PING_MSG_LEN); - - { - cJSON* pJson = syncPingReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SRpcMsg rpcMsg; - syncPingReply2RpcMsg(pMsg, &rpcMsg); - SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); - syncPingReplyFromRpcMsg(&rpcMsg, pMsg2); - rpcFreeCont(rpcMsg.pCont); - - { - cJSON* pJson = syncPingReply2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncPingReplyDestroy(pMsg); - syncPingReplyDestroy(pMsg2); -} - -void test5() { - sTrace("test5: ---- syncRequestVoteSerialize, syncRequestVoteDeserialize"); - - SyncRequestVote* pMsg = syncRequestVoteBuild(); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); - pMsg->destId.vgId = 100; - pMsg->currentTerm = 20; - pMsg->lastLogIndex = 21; - pMsg->lastLogTerm = 22; - - { - cJSON* pJson = syncRequestVote2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - uint32_t bufLen = pMsg->bytes; - char* buf = (char*)malloc(bufLen); - syncRequestVoteSerialize(pMsg, buf, bufLen); - - SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes); - syncRequestVoteDeserialize(buf, bufLen, pMsg2); - - { - cJSON* pJson = syncRequestVote2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncRequestVoteDestroy(pMsg); - syncRequestVoteDestroy(pMsg2); - free(buf); -} - -void test6() { - sTrace("test6: ---- syncRequestVote2RpcMsg, syncRequestVoteFromRpcMsg"); - - SyncRequestVote* pMsg = syncRequestVoteBuild(); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); - pMsg->destId.vgId = 100; - pMsg->currentTerm = 20; - pMsg->lastLogIndex = 21; - pMsg->lastLogTerm = 22; - - { - cJSON* pJson = syncRequestVote2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SRpcMsg rpcMsg; - syncRequestVote2RpcMsg(pMsg, &rpcMsg); - SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes); - syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2); - rpcFreeCont(rpcMsg.pCont); - - { - cJSON* pJson = syncRequestVote2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncRequestVoteDestroy(pMsg); - syncRequestVoteDestroy(pMsg2); -} - -void test7() { - sTrace("test7: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"); - - SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); - pMsg->destId.vgId = 100; - pMsg->term = 20; - pMsg->voteGranted = 1; - - { - cJSON* pJson = syncRequestVoteReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - uint32_t bufLen = pMsg->bytes; - char* buf = (char*)malloc(bufLen); - syncRequestVoteReplySerialize(pMsg, buf, bufLen); - - SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes); - syncRequestVoteReplyDeserialize(buf, bufLen, pMsg2); - - { - cJSON* pJson = syncRequestVoteReply2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncRequestVoteReplyDestroy(pMsg); - syncRequestVoteReplyDestroy(pMsg2); - free(buf); -} - -void test8() { - sTrace("test8: ---- syncRequestVoteReply2RpcMsg, syncRequestVoteReplyFromRpcMsg"); - - SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild(); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678); - pMsg->destId.vgId = 100; - pMsg->term = 20; - pMsg->voteGranted = 1; - - { - cJSON* pJson = syncRequestVoteReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SRpcMsg rpcMsg; - syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); - SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes); - syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2); - rpcFreeCont(rpcMsg.pCont); - - { - cJSON* pJson = syncRequestVoteReply2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncRequestVoteReplyDestroy(pMsg); - syncRequestVoteReplyDestroy(pMsg2); -} - -void test9() { - sTrace("test9: ---- syncAppendEntriesSerialize, syncAppendEntriesDeserialize"); - - char msg[APPEND_ENTRIES_VALUE_LEN]; - snprintf(msg, sizeof(msg), "%s", "test value"); - SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); - pMsg->destId.vgId = 100; - pMsg->prevLogIndex = 55; - pMsg->prevLogTerm = 66; - pMsg->commitIndex = 77; - memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN); - - { - cJSON* pJson = syncAppendEntries2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - uint32_t bufLen = pMsg->bytes; - char* buf = (char*)malloc(bufLen); - syncAppendEntriesSerialize(pMsg, buf, bufLen); - - SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes); - syncAppendEntriesDeserialize(buf, bufLen, pMsg2); - - { - cJSON* pJson = syncAppendEntries2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncAppendEntriesDestroy(pMsg); - syncAppendEntriesDestroy(pMsg2); - free(buf); -} - -void test10() { - sTrace("test10: ---- syncAppendEntries2RpcMsg, syncAppendEntriesFromRpcMsg"); - - char msg[APPEND_ENTRIES_VALUE_LEN]; - snprintf(msg, sizeof(msg), "%s", "test value"); - SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); - pMsg->destId.vgId = 100; - pMsg->prevLogIndex = 55; - pMsg->prevLogTerm = 66; - pMsg->commitIndex = 77; - memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN); - - { - cJSON* pJson = syncAppendEntries2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SRpcMsg rpcMsg; - syncAppendEntries2RpcMsg(pMsg, &rpcMsg); - SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes); - syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); - rpcFreeCont(rpcMsg.pCont); - - { - cJSON* pJson = syncAppendEntries2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncAppendEntriesDestroy(pMsg); - syncAppendEntriesDestroy(pMsg2); -} - -void test11() { - sTrace("test11: ---- syncAppendEntriesReplySerialize, syncAppendEntriesReplyDeserialize"); - - SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild(); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); - pMsg->destId.vgId = 100; - pMsg->success = 1; - pMsg->matchIndex = 23; - - { - cJSON* pJson = syncAppendEntriesReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - uint32_t bufLen = pMsg->bytes; - char* buf = (char*)malloc(bufLen); - syncAppendEntriesReplySerialize(pMsg, buf, bufLen); - - SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes); - syncAppendEntriesReplyDeserialize(buf, bufLen, pMsg2); - - { - cJSON* pJson = syncAppendEntriesReply2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncAppendEntriesReplyDestroy(pMsg); - syncAppendEntriesReplyDestroy(pMsg2); - free(buf); -} - -void test12() { - sTrace("test12: ---- syncAppendEntriesReply2RpcMsg, syncAppendEntriesReplyFromRpcMsg"); - - SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild(); - pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111); - pMsg->srcId.vgId = 100; - pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222); - pMsg->destId.vgId = 100; - pMsg->success = 1; - pMsg->matchIndex = 23; - - { - cJSON* pJson = syncAppendEntriesReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); - SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes); - syncAppendEntriesReplyFromRpcMsg(&rpcMsg, pMsg2); - rpcFreeCont(rpcMsg.pCont); - - { - cJSON* pJson = syncAppendEntriesReply2Json(pMsg2); - char* serialized = cJSON_Print(pJson); - printf("\n%s\n\n", serialized); - free(serialized); - cJSON_Delete(pJson); - } - - syncAppendEntriesReplyDestroy(pMsg); - syncAppendEntriesReplyDestroy(pMsg2); -} - -int main() { - // taosInitLog((char*)"syncPingTest.log", 100000, 10); - tsAsyncLog = 0; - sDebugFlag = 143 + 64; - - test1(); - test2(); - test3(); - test4(); - test5(); - test6(); - test7(); - test8(); - test9(); - test10(); - test11(); - test12(); - - return 0; -} diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index b6794544eb..7898fda8c0 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -89,7 +89,7 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = syncInitTest(); assert(pSyncNode != NULL); - syncNodePrint((char*)"syncInitTest", pSyncNode); + syncNodePrint2((char*)"syncInitTest", pSyncNode); initRaftId(pSyncNode); diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp index a5eb748de6..602fdee8c2 100644 --- a/source/libs/sync/test/syncLogStoreTest.cpp +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -83,7 +83,7 @@ SSyncNode* syncInitTest() { return syncNodeInit(); } void logStoreTest() { logStorePrint(pSyncNode->pLogStore); for (int i = 0; i < 5; ++i) { - int32_t dataLen = 10; + int32_t dataLen = 10; SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); assert(pEntry != NULL); pEntry->msgType = 1; @@ -94,7 +94,7 @@ void logStoreTest() { pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; snprintf(pEntry->data, dataLen, "value%d", i); - //syncEntryPrint2((char*)"write entry:", pEntry); + // syncEntryPrint2((char*)"write entry:", pEntry); pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); syncEntryDestory(pEntry); } @@ -132,8 +132,8 @@ int main(int argc, char** argv) { pSyncNode = syncInitTest(); assert(pSyncNode != NULL); - //syncNodePrint((char*)"syncLogStoreTest", pSyncNode); - //initRaftId(pSyncNode); + // syncNodePrint((char*)"syncLogStoreTest", pSyncNode); + // initRaftId(pSyncNode); logStoreTest(); diff --git a/source/libs/sync/test/syncPingReplyTest.cpp b/source/libs/sync/test/syncPingReplyTest.cpp new file mode 100644 index 0000000000..8e1448e781 --- /dev/null +++ b/source/libs/sync/test/syncPingReplyTest.cpp @@ -0,0 +1,95 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncPingReply *createMsg() { + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + srcId.vgId = 100; + destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + destId.vgId = 100; + SyncPingReply *pMsg = syncPingReplyBuild3(&srcId, &destId); + return pMsg; +} + +void test1() { + SyncPingReply *pMsg = createMsg(); + syncPingReplyPrint2((char *)"test1:", pMsg); + syncPingReplyDestroy(pMsg); +} + +void test2() { + SyncPingReply *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncPingReplySerialize(pMsg, serialized, len); + SyncPingReply *pMsg2 = syncPingReplyBuild(pMsg->dataLen); + syncPingReplyDeserialize(serialized, len, pMsg2); + syncPingReplyPrint2((char *)"test2: syncPingReplySerialize -> syncPingReplyDeserialize ", pMsg2); + + free(serialized); + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} + +void test3() { + SyncPingReply *pMsg = createMsg(); + uint32_t len; + char * serialized = syncPingReplySerialize2(pMsg, &len); + SyncPingReply *pMsg2 = syncPingReplyDeserialize2(serialized, len); + syncPingReplyPrint2((char *)"test3: syncPingReplySerialize3 -> syncPingReplyDeserialize2 ", pMsg2); + + free(serialized); + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} + +void test4() { + SyncPingReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + SyncPingReply *pMsg2 = (SyncPingReply *)malloc(rpcMsg.contLen); + syncPingReplyFromRpcMsg(&rpcMsg, pMsg2); + syncPingReplyPrint2((char *)"test4: syncPingReply2RpcMsg -> syncPingReplyFromRpcMsg ", pMsg2); + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} + +void test5() { + SyncPingReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + SyncPingReply *pMsg2 = syncPingReplyFromRpcMsg2(&rpcMsg); + syncPingReplyPrint2((char *)"test5: syncPingReply2RpcMsg -> syncPingReplyFromRpcMsg2 ", pMsg2); + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 83f1f67eb1..83394b0e77 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -1,9 +1,8 @@ #include #include -#include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" -#include "syncRaftStore.h" +#include "syncMessage.h" #include "syncUtil.h" void logTest() { @@ -15,116 +14,82 @@ void logTest() { sFatal("--- sync log test: fatal"); } -uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; -int32_t replicaNum = 3; -int32_t myIndex = 0; - -SRaftId ids[TSDB_MAX_REPLICA]; -SSyncInfo syncInfo; -SSyncFSM* pFsm; - -SSyncNode* syncNodeInit() { - syncInfo.vgId = 1234; - syncInfo.rpcClient = gSyncIO->clientRpc; - syncInfo.FpSendMsg = syncIOSendMsg; - syncInfo.queue = gSyncIO->pMsgQ; - syncInfo.FpEqMsg = syncIOEqMsg; - syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); - - SSyncCfg* pCfg = &syncInfo.syncCfg; - pCfg->myIndex = myIndex; - pCfg->replicaNum = replicaNum; - - for (int i = 0; i < replicaNum; ++i) { - pCfg->nodeInfo[i].nodePort = ports[i]; - snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - } - - SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); - assert(pSyncNode != NULL); - - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; - gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; - gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; - gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; - gSyncIO->pSyncNode = pSyncNode; - - return pSyncNode; +SyncPing *createMsg() { + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + srcId.vgId = 100; + destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + destId.vgId = 100; + SyncPing *pMsg = syncPingBuild3(&srcId, &destId); + return pMsg; } -SSyncNode* syncInitTest() { return syncNodeInit(); } - -void initRaftId(SSyncNode* pSyncNode) { - for (int i = 0; i < replicaNum; ++i) { - ids[i] = pSyncNode->replicasId[i]; - char* s = syncUtilRaftId2Str(&ids[i]); - printf("raftId[%d] : %s\n", i, s); - free(s); - } +void test1() { + SyncPing *pMsg = createMsg(); + syncPingPrint2((char *)"test1:", pMsg); + syncPingDestroy(pMsg); } -int main(int argc, char** argv) { +void test2() { + SyncPing *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncPingSerialize(pMsg, serialized, len); + SyncPing *pMsg2 = syncPingBuild(pMsg->dataLen); + syncPingDeserialize(serialized, len, pMsg2); + syncPingPrint2((char *)"test2: syncPingSerialize -> syncPingDeserialize ", pMsg2); + + free(serialized); + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); +} + +void test3() { + SyncPing *pMsg = createMsg(); + uint32_t len; + char * serialized = syncPingSerialize2(pMsg, &len); + SyncPing *pMsg2 = syncPingDeserialize2(serialized, len); + syncPingPrint2((char *)"test3: syncPingSerialize3 -> syncPingDeserialize2 ", pMsg2); + + free(serialized); + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); +} + +void test4() { + SyncPing *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + SyncPing *pMsg2 = (SyncPing *)malloc(rpcMsg.contLen); + syncPingFromRpcMsg(&rpcMsg, pMsg2); + syncPingPrint2((char *)"test4: syncPing2RpcMsg -> syncPingFromRpcMsg ", pMsg2); + + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); +} + +void test5() { + SyncPing *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + SyncPing *pMsg2 = syncPingFromRpcMsg2(&rpcMsg); + syncPingPrint2((char *)"test5: syncPing2RpcMsg -> syncPingFromRpcMsg2 ", pMsg2); + + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); +} + +int main() { // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; + logTest(); - myIndex = 0; - if (argc >= 2) { - myIndex = atoi(argv[1]); - } - - int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); - assert(ret == 0); - - ret = syncEnvStart(); - assert(ret == 0); - - SSyncNode* pSyncNode = syncInitTest(); - assert(pSyncNode != NULL); - syncNodePrint((char*)"----1", pSyncNode); - - initRaftId(pSyncNode); - - //--------------------------- - - sTrace("syncNodeStartPingTimer ..."); - ret = syncNodeStartPingTimer(pSyncNode); - assert(ret == 0); - syncNodePrint((char*)"----2", pSyncNode); - - sTrace("sleep ..."); - taosMsleep(10000); - - sTrace("syncNodeStopPingTimer ..."); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); - syncNodePrint((char*)"----3", pSyncNode); - - sTrace("sleep ..."); - taosMsleep(5000); - - sTrace("syncNodeStartPingTimer ..."); - ret = syncNodeStartPingTimer(pSyncNode); - assert(ret == 0); - syncNodePrint((char*)"----4", pSyncNode); - - sTrace("sleep ..."); - taosMsleep(10000); - - sTrace("syncNodeStopPingTimer ..."); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); - syncNodePrint((char*)"----5", pSyncNode); - - while (1) { - sTrace("while 1 sleep ..."); - taosMsleep(1000); - } + test1(); + test2(); + test3(); + test4(); + test5(); return 0; } diff --git a/source/libs/sync/test/syncPingTimerTest.cpp b/source/libs/sync/test/syncPingTimerTest.cpp new file mode 100644 index 0000000000..e69878632f --- /dev/null +++ b/source/libs/sync/test/syncPingTimerTest.cpp @@ -0,0 +1,130 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + syncNodePrint2((char*)"----1", pSyncNode); + + initRaftId(pSyncNode); + + //--------------------------- + + sTrace("syncNodeStartPingTimer ..."); + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + syncNodePrint2((char*)"----2", pSyncNode); + + sTrace("sleep ..."); + taosMsleep(10000); + + sTrace("syncNodeStopPingTimer ..."); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + syncNodePrint2((char*)"----3", pSyncNode); + + sTrace("sleep ..."); + taosMsleep(5000); + + sTrace("syncNodeStartPingTimer ..."); + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + syncNodePrint2((char*)"----4", pSyncNode); + + sTrace("sleep ..."); + taosMsleep(10000); + + sTrace("syncNodeStopPingTimer ..."); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + syncNodePrint2((char*)"----5", pSyncNode); + + while (1) { + sTrace("while 1 sleep ..."); + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp index 71c0138c8d..0c1c9b881e 100644 --- a/source/libs/sync/test/syncRaftStoreTest.cpp +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -22,15 +22,21 @@ int main() { SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); assert(pRaftStore != NULL); - raftStorePrint(pRaftStore); +#if 0 pRaftStore->currentTerm = 100; pRaftStore->voteFor.addr = 200; pRaftStore->voteFor.vgId = 300; - - raftStorePrint(pRaftStore); raftStorePersist(pRaftStore); + raftStorePrint(pRaftStore); +#endif + + ++(pRaftStore->currentTerm); + ++(pRaftStore->voteFor.addr); + ++(pRaftStore->voteFor.vgId); + raftStorePersist(pRaftStore); + raftStorePrint(pRaftStore); return 0; } diff --git a/source/libs/sync/test/syncRequestVoteReplyTest.cpp b/source/libs/sync/test/syncRequestVoteReplyTest.cpp new file mode 100644 index 0000000000..2bce3e4cd6 --- /dev/null +++ b/source/libs/sync/test/syncRequestVoteReplyTest.cpp @@ -0,0 +1,97 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncRequestVoteReply *createMsg() { + SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 77; + pMsg->voteGranted = true; + return pMsg; +} + +void test1() { + SyncRequestVoteReply *pMsg = createMsg(); + syncRequestVoteReplyPrint2((char *)"test1:", pMsg); + syncRequestVoteReplyDestroy(pMsg); +} + +void test2() { + SyncRequestVoteReply *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncRequestVoteReplySerialize(pMsg, serialized, len); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyBuild(); + syncRequestVoteReplyDeserialize(serialized, len, pMsg2); + syncRequestVoteReplyPrint2((char *)"test2: syncRequestVoteReplySerialize -> syncRequestVoteReplyDeserialize ", pMsg2); + + free(serialized); + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test3() { + SyncRequestVoteReply *pMsg = createMsg(); + uint32_t len; + char * serialized = syncRequestVoteReplySerialize2(pMsg, &len); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyDeserialize2(serialized, len); + syncRequestVoteReplyPrint2((char *)"test3: syncRequestVoteReplySerialize3 -> syncRequestVoteReplyDeserialize2 ", + pMsg2); + + free(serialized); + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test4() { + SyncRequestVoteReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyBuild(); + syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2); + syncRequestVoteReplyPrint2((char *)"test4: syncRequestVoteReply2RpcMsg -> syncRequestVoteReplyFromRpcMsg ", pMsg2); + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +void test5() { + SyncRequestVoteReply *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyFromRpcMsg2(&rpcMsg); + syncRequestVoteReplyPrint2((char *)"test5: syncRequestVoteReply2RpcMsg -> syncRequestVoteReplyFromRpcMsg2 ", pMsg2); + + syncRequestVoteReplyDestroy(pMsg); + syncRequestVoteReplyDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncRequestVoteTest.cpp b/source/libs/sync/test/syncRequestVoteTest.cpp new file mode 100644 index 0000000000..7f75ee937b --- /dev/null +++ b/source/libs/sync/test/syncRequestVoteTest.cpp @@ -0,0 +1,97 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncRequestVote *createMsg() { + SyncRequestVote *pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 11; + pMsg->lastLogIndex = 22; + pMsg->lastLogTerm = 33; + return pMsg; +} + +void test1() { + SyncRequestVote *pMsg = createMsg(); + syncRequestVotePrint2((char *)"test1:", pMsg); + syncRequestVoteDestroy(pMsg); +} + +void test2() { + SyncRequestVote *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncRequestVoteSerialize(pMsg, serialized, len); + SyncRequestVote *pMsg2 = syncRequestVoteBuild(); + syncRequestVoteDeserialize(serialized, len, pMsg2); + syncRequestVotePrint2((char *)"test2: syncRequestVoteSerialize -> syncRequestVoteDeserialize ", pMsg2); + + free(serialized); + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); +} + +void test3() { + SyncRequestVote *pMsg = createMsg(); + uint32_t len; + char * serialized = syncRequestVoteSerialize2(pMsg, &len); + SyncRequestVote *pMsg2 = syncRequestVoteDeserialize2(serialized, len); + syncRequestVotePrint2((char *)"test3: syncRequestVoteSerialize3 -> syncRequestVoteDeserialize2 ", pMsg2); + + free(serialized); + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); +} + +void test4() { + SyncRequestVote *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + SyncRequestVote *pMsg2 = syncRequestVoteBuild(); + syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2); + syncRequestVotePrint2((char *)"test4: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg ", pMsg2); + + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); +} + +void test5() { + SyncRequestVote *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + SyncRequestVote *pMsg2 = syncRequestVoteFromRpcMsg2(&rpcMsg); + syncRequestVotePrint2((char *)"test5: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg2 ", pMsg2); + + syncRequestVoteDestroy(pMsg); + syncRequestVoteDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp new file mode 100644 index 0000000000..0331a29f22 --- /dev/null +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -0,0 +1,181 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int gg = 0; +SyncTimeout *createSyncTimeout() { + SyncTimeout *pMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, 999, 333, &gg); + return pMsg; +} + +SyncPing *createSyncPing() { + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + srcId.vgId = 100; + destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + destId.vgId = 100; + SyncPing *pMsg = syncPingBuild3(&srcId, &destId); + return pMsg; +} + +SyncPingReply *createSyncPingReply() { + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + srcId.vgId = 100; + destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + destId.vgId = 100; + SyncPingReply *pMsg = syncPingReplyBuild3(&srcId, &destId); + return pMsg; +} + +SyncClientRequest *createSyncClientRequest() { + SRpcMsg rpcMsg; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.msgType = 12345; + rpcMsg.contLen = 20; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + strcpy((char *)rpcMsg.pCont, "hello rpc"); + SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true); + return pMsg; +} + +SyncRequestVote *createSyncRequestVote() { + SyncRequestVote *pMsg = syncRequestVoteBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->currentTerm = 11; + pMsg->lastLogIndex = 22; + pMsg->lastLogTerm = 33; + return pMsg; +} + +SyncRequestVoteReply *createSyncRequestVoteReply() { + SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->term = 77; + pMsg->voteGranted = true; + return pMsg; +} + +SyncAppendEntries *createSyncAppendEntries() { + SyncAppendEntries *pMsg = syncAppendEntriesBuild(20); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 11; + pMsg->prevLogTerm = 22; + pMsg->commitIndex = 33; + strcpy(pMsg->data, "hello world"); + return pMsg; +} + +SyncAppendEntriesReply *createSyncAppendEntriesReply() { + SyncAppendEntriesReply *pMsg = syncAppendEntriesReplyBuild(); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->success = true; + pMsg->matchIndex = 77; + return pMsg; +} + +void test1() { + SyncTimeout *pMsg = createSyncTimeout(); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test1", &rpcMsg); + syncTimeoutDestroy(pMsg); +} + +void test2() { + SyncPing *pMsg = createSyncPing(); + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test2", &rpcMsg); + syncPingDestroy(pMsg); +} + +void test3() { + SyncPingReply *pMsg = createSyncPingReply(); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test3", &rpcMsg); + syncPingReplyDestroy(pMsg); +} + +void test4() { + SyncRequestVote *pMsg = createSyncRequestVote(); + SRpcMsg rpcMsg; + syncRequestVote2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test4", &rpcMsg); + syncRequestVoteDestroy(pMsg); +} + +void test5() { + SyncRequestVoteReply *pMsg = createSyncRequestVoteReply(); + SRpcMsg rpcMsg; + syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test5", &rpcMsg); + syncRequestVoteReplyDestroy(pMsg); +} + +void test6() { + SyncAppendEntries *pMsg = createSyncAppendEntries(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test6", &rpcMsg); + syncAppendEntriesDestroy(pMsg); +} + +void test7() { + SyncAppendEntriesReply *pMsg = createSyncAppendEntriesReply(); + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test7", &rpcMsg); + syncAppendEntriesReplyDestroy(pMsg); +} + +void test8() { + SyncClientRequest *pMsg = createSyncClientRequest(); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"test8", &rpcMsg); + syncClientRequestDestroy(pMsg); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + test6(); + test7(); + test8(); + + return 0; +} diff --git a/source/libs/sync/test/syncTimeoutTest.cpp b/source/libs/sync/test/syncTimeoutTest.cpp new file mode 100644 index 0000000000..3f46ab5c7c --- /dev/null +++ b/source/libs/sync/test/syncTimeoutTest.cpp @@ -0,0 +1,92 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int gg = 0; + +SyncTimeout *createMsg() { + SyncTimeout *pMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, 999, 333, &gg); + return pMsg; +} + +void test1() { + SyncTimeout *pMsg = createMsg(); + syncTimeoutPrint2((char *)"test1:", pMsg); + syncTimeoutDestroy(pMsg); +} + +void test2() { + SyncTimeout *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)malloc(len); + syncTimeoutSerialize(pMsg, serialized, len); + SyncTimeout *pMsg2 = syncTimeoutBuild(); + syncTimeoutDeserialize(serialized, len, pMsg2); + syncTimeoutPrint2((char *)"test2: syncTimeoutSerialize -> syncTimeoutDeserialize ", pMsg2); + + free(serialized); + syncTimeoutDestroy(pMsg); + syncTimeoutDestroy(pMsg2); +} + +void test3() { + SyncTimeout *pMsg = createMsg(); + uint32_t len; + char * serialized = syncTimeoutSerialize2(pMsg, &len); + SyncTimeout *pMsg2 = syncTimeoutDeserialize2(serialized, len); + syncTimeoutPrint2((char *)"test3: syncTimeoutSerialize3 -> syncTimeoutDeserialize2 ", pMsg2); + + free(serialized); + syncTimeoutDestroy(pMsg); + syncTimeoutDestroy(pMsg2); +} + +void test4() { + SyncTimeout *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pMsg, &rpcMsg); + SyncTimeout *pMsg2 = (SyncTimeout *)malloc(rpcMsg.contLen); + syncTimeoutFromRpcMsg(&rpcMsg, pMsg2); + syncTimeoutPrint2((char *)"test4: syncTimeout2RpcMsg -> syncTimeoutFromRpcMsg ", pMsg2); + + syncTimeoutDestroy(pMsg); + syncTimeoutDestroy(pMsg2); +} + +void test5() { + SyncTimeout *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pMsg, &rpcMsg); + SyncTimeout *pMsg2 = syncTimeoutFromRpcMsg2(&rpcMsg); + syncTimeoutPrint2((char *)"test5: syncTimeout2RpcMsg -> syncTimeoutFromRpcMsg2 ", pMsg2); + + syncTimeoutDestroy(pMsg); + syncTimeoutDestroy(pMsg2); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 504fa3034a..588eb32ffd 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -118,7 +118,7 @@ int main(int argc, char** argv) { } for (int i = 0; i < replicaNum; ++i) { - SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild(); + SyncRequestVoteReply* reply = syncRequestVoteReplyBuild(); reply->destId = pSyncNode->myRaftId; reply->srcId = ids[i]; reply->term = term; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 0b6abef212..76fd6fab4e 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -118,7 +118,7 @@ int main(int argc, char** argv) { } for (int i = 0; i < replicaNum; ++i) { - SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild(); + SyncRequestVoteReply* reply = syncRequestVoteReplyBuild(); reply->destId = pSyncNode->myRaftId; reply->srcId = ids[i]; reply->term = term;