diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index be4bf0e4d2..571d14fe3c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -269,6 +269,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) #if defined(TD_MSG_NUMBER_) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index d5c015bfb2..eedc403493 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -678,24 +678,61 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg); void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); -// on message ---------------------- -int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); +// --------------------------------------------- +typedef enum { + SYNC_LOCAL_CMD_STEP_DOWN = 100, +} ESyncLocalCmd; + +typedef struct SyncLocalCmd { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + int32_t cmd; + SyncTerm sdNewTerm; // step down new term + +} SyncLocalCmd; + +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId); +void syncLocalCmdDestroy(SyncLocalCmd* pMsg); +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len); +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len); +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg); +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg); +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg); +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg); + +// for debug ---------------------- +void syncLocalCmdPrint(const SyncLocalCmd* pMsg); +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg); +void syncLocalCmdLog(const SyncLocalCmd* pMsg); +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); + +// on message ---------------------- +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); -int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); -int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); -int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); + int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); +int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); +int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); +int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index bf60bb55e2..7207343582 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -538,12 +538,12 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + code = syncNodeOnPing(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + code = syncNodeOnPingReply(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d3ae1015d0..7acf5b4003 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -301,13 +301,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + code = syncNodeOnPing(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + code = syncNodeOnPingReply(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 2a37e000e2..06da0eb3df 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -28,13 +28,13 @@ extern "C" { #include "trpc.h" #include "ttimer.h" -#define TIMER_MAX_MS 0x7FFFFFFF -#define ENV_TICK_TIMER_MS 1000 -#define PING_TIMER_MS 5000 -#define ELECT_TIMER_MS_MIN 5000 -#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) +#define TIMER_MAX_MS 0x7FFFFFFF +#define ENV_TICK_TIMER_MS 1000 +#define PING_TIMER_MS 5000 +#define ELECT_TIMER_MS_MIN 2500 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 900 +#define HEARTBEAT_TIMER_MS 1000 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b77652bc81..c1b8527856 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -49,8 +49,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); // process message ---- -int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); // --------------------------------- static void syncNodeFreeCb(void* param) { @@ -1327,8 +1327,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { } // init callback - pSyncNode->FpOnPing = syncNodeOnPingCb; - pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnPing = syncNodeOnPing; + pSyncNode->FpOnPingReply = syncNodeOnPingReply; pSyncNode->FpOnClientRequest = syncNodeOnClientRequest; pSyncNode->FpOnTimeout = syncNodeOnTimer; pSyncNode->FpOnSnapshot = syncNodeOnSnapshot; @@ -3003,18 +3003,19 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { } // on message ---- -int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { - // log state +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) { + sTrace("vgId:%d, recv sync-ping", ths->vgId); + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsgReply, &rpcMsg); /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); -*/ + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + */ syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); syncPingReplyDestroy(pMsgReply); @@ -3022,9 +3023,9 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { return 0; } -int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) { int32_t ret = 0; - syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); + sTrace("vgId:%d, recv sync-ping-reply", ths->vgId); return ret; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 7a8ea0c4d7..3c36633fe8 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3095,3 +3095,153 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { taosMemoryFree(serialized); } } + +// --------------------------------------------- +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncLocalCmd); + SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_LOCAL_CMD; + return pMsg; +} + +void syncLocalCmdDestroy(SyncLocalCmd* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncLocalCmdSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncLocalCmdDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) { + syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); + cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot); + return pJson; +} + +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) { + cJSON* pJson = syncLocalCmd2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncLocalCmdPrint(const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLocalCmdLog(const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncLocalCmd2Str(pMsg); + sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} \ No newline at end of file diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 7296e30e02..af53123421 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -57,7 +57,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, + snprintf(logBuf, sizeof(logBuf), "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex, logStartIndex, logEndIndex); syncNodeEventLog(pSyncNode, logBuf); diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index b9cc7a391d..26dd32942b 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -59,6 +59,7 @@ add_executable(syncRestoreFromSnapshot "") add_executable(syncRaftCfgIndexTest "") add_executable(syncHeartbeatTest "") add_executable(syncHeartbeatReplyTest "") +add_executable(syncLocalCmdTest "") target_sources(syncTest @@ -305,6 +306,10 @@ target_sources(syncHeartbeatReplyTest PRIVATE "syncHeartbeatReplyTest.cpp" ) +target_sources(syncLocalCmdTest + PRIVATE + "syncLocalCmdTest.cpp" +) target_include_directories(syncTest @@ -612,6 +617,11 @@ target_include_directories(syncHeartbeatReplyTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLocalCmdTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -858,6 +868,10 @@ target_link_libraries(syncHeartbeatReplyTest sync gtest_main ) +target_link_libraries(syncLocalCmdTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 95677e592b..8f16be27e7 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -270,7 +270,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index c04ab9b000..d1244546c9 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -191,7 +191,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 8b209c4c9e..9f1a81e7ed 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -186,7 +186,7 @@ int main(int argc, char **argv) { int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, pEntry->index, &pEntry); ASSERT(code == 0); - syncEntryLog2((char *)"==pEntry2==", pEntry2); + syncEntryLog2((char *)"==pEntry==", pEntry); // step5 uint32_t len; diff --git a/source/libs/sync/test/syncIndexTest.cpp b/source/libs/sync/test/syncIndexTest.cpp index 93cd5f79f0..763117c0c9 100644 --- a/source/libs/sync/test/syncIndexTest.cpp +++ b/source/libs/sync/test/syncIndexTest.cpp @@ -13,7 +13,7 @@ void print(SHashObj *pNextIndex) { SRaftId *pRaftId = (SRaftId *)key; - printf("key:<" PRIu64 ", %d>, value:%" PRIu64 " \n", pRaftId->addr, pRaftId->vgId, *p); + printf("key:<%" PRIu64 ", %d>, value:%" PRIu64 " \n", pRaftId->addr, pRaftId->vgId, *p); p = (uint64_t *)taosHashIterate(pNextIndex, p); } } diff --git a/source/libs/sync/test/syncLocalCmdTest.cpp b/source/libs/sync/test/syncLocalCmdTest.cpp new file mode 100644 index 0000000000..de908bf9c1 --- /dev/null +++ b/source/libs/sync/test/syncLocalCmdTest.cpp @@ -0,0 +1,100 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncLocalCmd *createMsg() { + SyncLocalCmd *pMsg = syncLocalCmdBuild(1000); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->sdNewTerm = 123; + pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; + + return pMsg; +} + +void test1() { + SyncLocalCmd *pMsg = createMsg(); + syncLocalCmdLog2((char *)"test1:", pMsg); + syncLocalCmdDestroy(pMsg); +} + +void test2() { + SyncLocalCmd *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char *serialized = (char *)taosMemoryMalloc(len); + syncLocalCmdSerialize(pMsg, serialized, len); + SyncLocalCmd *pMsg2 = syncLocalCmdBuild(1000); + syncLocalCmdDeserialize(serialized, len, pMsg2); + syncLocalCmdLog2((char *)"test2: syncLocalCmdSerialize -> syncLocalCmdDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); +} + +void test3() { + SyncLocalCmd *pMsg = createMsg(); + uint32_t len; + char *serialized = syncLocalCmdSerialize2(pMsg, &len); + SyncLocalCmd *pMsg2 = syncLocalCmdDeserialize2(serialized, len); + syncLocalCmdLog2((char *)"test3: syncLocalCmdSerialize3 -> syncLocalCmdDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); +} + +void test4() { + SyncLocalCmd *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncLocalCmd2RpcMsg(pMsg, &rpcMsg); + SyncLocalCmd *pMsg2 = (SyncLocalCmd *)taosMemoryMalloc(rpcMsg.contLen); + syncLocalCmdFromRpcMsg(&rpcMsg, pMsg2); + syncLocalCmdLog2((char *)"test4: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); +} + +void test5() { + SyncLocalCmd *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncLocalCmd2RpcMsg(pMsg, &rpcMsg); + SyncLocalCmd *pMsg2 = syncLocalCmdFromRpcMsg2(&rpcMsg); + syncLocalCmdLog2((char *)"test5: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); +} + +int main() { + gRaftDetailLog = true; + + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncRaftIdCheck.cpp b/source/libs/sync/test/syncRaftIdCheck.cpp index 65da0f6631..e7ef69da20 100644 --- a/source/libs/sync/test/syncRaftIdCheck.cpp +++ b/source/libs/sync/test/syncRaftIdCheck.cpp @@ -15,7 +15,7 @@ int main(int argc, char** argv) { char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); - printf("" PRIu64 " -> %s:%d \n", u64, host, port); + printf("%" PRIu64 " -> %s:%d \n", u64, host, port); } else if (argc == 3) { uint64_t u64; diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index fd4cade31c..1171ef8785 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -97,8 +97,8 @@ void test1() { sTrace("lastIndex: %" PRId64, lastIndex); sTrace("lastTerm: %" PRIu64, lastTerm); sTrace("syncStartIndex: %" PRId64, syncStartIndex); - sTrace("" PRId64 "'s preIndex: %" PRId64, testIndex, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, testIndex, preTerm); + sTrace("testIndex: %" PRId64 " preIndex: %" PRId64, testIndex, preIndex); + sTrace("testIndex: %" PRId64 " preTerm: %" PRIu64, testIndex, preTerm); if (gAssert) { assert(lastIndex == -1); @@ -170,8 +170,8 @@ void test2() { SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); - sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); + sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex); + sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm); if (gAssert) { SyncIndex preIndexArr[12] = {-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -292,8 +292,8 @@ void test4() { SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); - sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); + sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex); + sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm); } logStoreDestory(pLogStore); @@ -354,8 +354,8 @@ void test5() { SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); - sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); + sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex); + sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm); if (gAssert) { SyncIndex preIndexArr[12] = {9999, 9999, 9999, 9999, 9999, 9999, 5, 6, 7, 8, 9, 10}; diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index d3ba4bc136..adb3deb22d 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -145,7 +145,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index 35daff796f..cad6eec91d 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -58,18 +58,18 @@ void syncRespMgrGetTest(uint64_t i) { if (ret == 1) { printStub(&stub); } else if (ret == 0) { - printf("" PRId64 " notFound \n", i); + printf("%" PRId64 " notFound \n", i); } } void syncRespMgrGetAndDelTest(uint64_t i) { - printf("------syncRespMgrGetAndDelTest-------" PRIu64 "-- \n", i); + printf("------syncRespMgrGetAndDelTest-------%" PRIu64 "-- \n", i); SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub); if (ret == 1) { printStub(&stub); } else if (ret == 0) { - printf("" PRId64 " notFound \n", i); + printf("%" PRId64 " notFound \n", i); } } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 1cdecfe5b3..bdb4d7d2d8 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -154,7 +154,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { - char* s = syncCfg2Str(&(cbMeta.newCfg)); + char* s = syncCfg2Str(&(cbMeta->newCfg)); sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 ", newCfg:%s", cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s); @@ -308,7 +308,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = TDMT_VND_SUBMIT; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; }