diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 09750864d5..199a895961 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -35,7 +35,7 @@ extern "C" { // /\ Discard(m) // /\ UNCHANGED <> // -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 419f9591b3..d201c3dc03 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -112,24 +112,6 @@ typedef struct SyncAppendEntriesReply { int64_t startTime; } SyncAppendEntriesReply; -SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); -void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); -void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); -char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len); -SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len); -void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); -void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); -SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); -char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg); - -// for debug ---------------------- -void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); -void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); - // --------------------------------------------- typedef struct SyncHeartbeat { uint32_t bytes; @@ -430,7 +412,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg); int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg); -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg); int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg); int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg); @@ -452,14 +434,13 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode); const char* syncTimerTypeStr(enum ESyncTimeoutType timerType); -int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, - SSyncNode* pNode); -int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, - bool isWeak, int32_t vgId); -int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId); +int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); +int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginalRpc, uint64_t seq, bool isWeak, int32_t vgId); +int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId); int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); +int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 91c8d183e3..ce6aad1983 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -138,6 +138,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = pRpcMsg->pCont; + SRpcMsg rpcRsp = {0}; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { @@ -146,7 +147,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // prepare response msg - SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId); + if (code != 0) { + syncLogRecvAppendEntries(ths, pMsg, "build rsp error"); + goto _IGNORE; + } + + SyncAppendEntriesReply* pReply = rpcRsp.pCont; pReply->srcId = ths->myRaftId; pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; @@ -288,7 +295,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _SEND_RESPONSE; _IGNORE: - syncAppendEntriesReplyDestroy(pReply); + rpcFreeCont(rpcRsp.pCont); return 0; _SEND_RESPONSE: @@ -296,10 +303,6 @@ _SEND_RESPONSE: syncLogSendAppendEntriesReply(ths, pReply, ""); // send response - SRpcMsg rpcMsg; - syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); - syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); - syncAppendEntriesReplyDestroy(pReply); - + syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 53d6b5d92f..a6a9389ca6 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -73,8 +73,9 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync #endif } -int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + int32_t ret = 0; + SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 67b1fed491..69405a7aab 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -152,9 +152,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { syncNodeOnAppendEntries(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { - SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); - code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); - syncAppendEntriesReplyDestroy(pSyncMsg); + syncNodeOnAppendEntriesReply(pSyncNode, pMsg); } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); @@ -603,7 +601,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); SRpcMsg rpcMsg = {0}; - int32_t code = syncClientRequestBuildFromRpcMsg(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId); + int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId); if (code != 0) { sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr()); (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); @@ -1796,7 +1794,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pNode = param; if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) { SRpcMsg rpcMsg = {0}; - int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), + int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), pNode->pingTimerMS, pNode); if (code != 0) { sNError(pNode, "failed to build ping msg"); @@ -1826,7 +1824,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { SSyncNode* pNode = pElectTimer->pSyncNode; SRpcMsg rpcMsg = {0}; - int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); + int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); if (code != 0) { sNError(pNode, "failed to build elect msg"); @@ -1865,7 +1863,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (pNode->replicaNum > 1) { if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) { SRpcMsg rpcMsg = {0}; - int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock), + int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock), pNode->heartbeatTimerMS, pNode); if (code != 0) { @@ -1972,7 +1970,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { if (pEntry == NULL) return -1; SRpcMsg rpcMsg = {0}; - int32_t code = syncClientRequestBuildFromNoopEntry(&rpcMsg, pEntry, pNode->vgId); + int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId); syncEntryDestory(pEntry); sNTrace(pNode, "propose msg, type:noop"); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 826549998e..4c36c1a764 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -20,18 +20,18 @@ #include "syncUtil.h" #include "tcoding.h" -int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, +int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, SSyncNode* pNode) { int32_t bytes = sizeof(SyncTimeout); - pTimeoutRpcMsg->pCont = rpcMallocCont(bytes); - pTimeoutRpcMsg->msgType = TDMT_SYNC_TIMEOUT; - pTimeoutRpcMsg->contLen = bytes; - if (pTimeoutRpcMsg->pCont == NULL) { + pMsg->pCont = rpcMallocCont(bytes); + pMsg->msgType = TDMT_SYNC_TIMEOUT; + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - SyncTimeout* pTimeout = pTimeoutRpcMsg->pCont; + SyncTimeout* pTimeout = pMsg->pCont; pTimeout->bytes = bytes; pTimeout->msgType = TDMT_SYNC_TIMEOUT; pTimeout->vgId = pNode->vgId; @@ -42,41 +42,40 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, return 0; } -int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, - bool isWeak, int32_t vgId) { - int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen; - pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); - pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pClientRequestRpcMsg->contLen = bytes; - if (pClientRequestRpcMsg->pCont == NULL) { +int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) { + int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen; + pMsg->pCont = rpcMallocCont(bytes); + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont; + SyncClientRequest* pClientRequest = pMsg->pCont; pClientRequest->bytes = bytes; pClientRequest->vgId = vgId; pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST; - pClientRequest->originalRpcType = pOriginalRpcMsg->msgType; + pClientRequest->originalRpcType = pOriginal->msgType; pClientRequest->seqNum = seqNum; pClientRequest->isWeak = isWeak; - pClientRequest->dataLen = pOriginalRpcMsg->contLen; - memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); + pClientRequest->dataLen = pOriginal->contLen; + memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen); return 0; } -int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) { +int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) { int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes; - pClientRequestRpcMsg->pCont = rpcMallocCont(bytes); - pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; - pClientRequestRpcMsg->contLen = bytes; - if (pClientRequestRpcMsg->pCont == NULL) { + pMsg->pCont = rpcMallocCont(bytes); + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST; + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont; + SyncClientRequest* pClientRequest = pMsg->pCont; pClientRequest->bytes = bytes; pClientRequest->vgId = vgId; pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST; @@ -93,7 +92,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_REQUEST_VOTE; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TDMT_SYNC_REQUEST_VOTE; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -110,7 +109,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) { pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY; pMsg->contLen = bytes; if (pMsg->pCont == NULL) { - terrno = TDMT_SYNC_REQUEST_VOTE; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -139,160 +138,21 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) { return 0; } -// ---- message process SyncAppendEntriesReply---- -SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncAppendEntriesReply); - SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncAppendEntriesReply); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; - return pMsg; -} - -void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncAppendEntriesReplyDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) { - syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { - char u64buf[128] = {0}; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); - cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); - - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); - cJSON_AddStringToObject(pRoot, "term", u64buf); - cJSON_AddNumberToObject(pRoot, "success", pMsg->success); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); - cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); - snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); - cJSON_AddStringToObject(pRoot, "startTime", u64buf); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); - return pJson; -} - -char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) { - cJSON* pJson = syncAppendEntriesReply2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ---------------------- -void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) { - char* serialized = syncAppendEntriesReply2Str(pMsg); - printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) { - char* serialized = syncAppendEntriesReply2Str(pMsg); - printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) { - char* serialized = syncAppendEntriesReply2Str(pMsg); - sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) { - if (gRaftDetailLog) { - char* serialized = syncAppendEntriesReply2Str(pMsg); - sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); - } + SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont; + pAppendEntriesReply->bytes = bytes; + pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; + pAppendEntriesReply->vgId = vgId; + return 0; } // ---- message process SyncHeartbeat---- diff --git a/source/libs/sync/test/syncRpcMsgTest.cpp b/source/libs/sync/test/syncRpcMsgTest.cpp index 3f66cd9d1c..5fb622d791 100644 --- a/source/libs/sync/test/syncRpcMsgTest.cpp +++ b/source/libs/sync/test/syncRpcMsgTest.cpp @@ -45,7 +45,7 @@ SyncClientRequest *createSyncClientRequest() { strcpy((char *)rpcMsg.pCont, "hello rpc"); SRpcMsg clientRequestMsg; - syncClientRequestBuildFromRpcMsg(&clientRequestMsg, &rpcMsg, 123, true, 1000); + syncBuildClientRequest(&clientRequestMsg, &rpcMsg, 123, true, 1000); SyncClientRequest *pMsg = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen); memcpy(pMsg->data, clientRequestMsg.pCont, clientRequestMsg.contLen); return pMsg; diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 15854eeca6..000348bf1b 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -154,7 +154,7 @@ SRpcMsg *step0() { SyncClientRequest *step1(const SRpcMsg *pMsg) { SRpcMsg clientRequestMsg; - syncClientRequestBuildFromRpcMsg(&clientRequestMsg, pMsg, 123, true, 1000); + syncBuildClientRequest(&clientRequestMsg, pMsg, 123, true, 1000); SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen); memcpy(pMsg2->data, clientRequestMsg.pCont, clientRequestMsg.contLen); return pMsg2; diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index ef3e4c3af9..8b5a38224b 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -295,6 +295,24 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg); void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); +SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId); +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen); +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg); +char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len); +SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len); +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg); +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg); +SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg); +char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg); + +// for debug ---------------------- +void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); +void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index a5a30f0273..d4a3cc1872 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -1479,3 +1479,159 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { taosMemoryFree(serialized); } } + +// ---- message process SyncAppendEntriesReply---- +SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncAppendEntriesReply); + SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY; + return pMsg; +} + +void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncAppendEntriesReplyDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) { + syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { + char u64buf[128] = {0}; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + cJSON_AddNumberToObject(pRoot, "success", pMsg->success); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); + cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime); + cJSON_AddStringToObject(pRoot, "startTime", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot); + return pJson; +} + +char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) { + cJSON* pJson = syncAppendEntriesReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncAppendEntriesReply2Str(pMsg); + sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +}