diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index c1f382580a..8b98cf0e69 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -219,18 +219,6 @@ typedef struct SyncLeaderTransfer { SRaftId newLeaderId; } SyncLeaderTransfer; -SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId); -void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg); -void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen); -void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg); -char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len); -SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len); -void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg); -void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg); -SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg); -cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg); -char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg); - typedef enum { SYNC_LOCAL_CMD_STEP_DOWN = 100, SYNC_LOCAL_CMD_FOLLOWER_CMT, @@ -305,6 +293,7 @@ int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId); int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta); int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId); int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId); +int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ed416ae6fb..75d3b34f21 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -411,17 +411,15 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort); - SyncLeaderTransfer* pMsg = syncLeaderTransferBuild(pSyncNode->vgId); + SRpcMsg rpcMsg = {0}; + (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId); + + SyncLeaderTransfer* pMsg = rpcMsg.pCont; pMsg->newLeaderId.addr = syncUtilAddr2U64(newLeader.nodeFqdn, newLeader.nodePort); pMsg->newLeaderId.vgId = pSyncNode->vgId; pMsg->newNodeInfo = newLeader; - ASSERT(pMsg != NULL); - SRpcMsg rpcMsg = {0}; - syncLeaderTransfer2RpcMsg(pMsg, &rpcMsg); - syncLeaderTransferDestroy(pMsg); - int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false); - return ret; + return syncNodePropose(pSyncNode, &rpcMsg, false); } SSyncState syncGetState(int64_t rid) { @@ -2210,6 +2208,7 @@ const char* syncStr(ESyncState state) { } } +#if 0 int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { sNTrace(ths, "I am not follower, can not do leader transfer"); @@ -2238,7 +2237,7 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p } */ - SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); + SyncLeaderTransfer* pSyncLeaderTransfer = pRpcMsg->pCont; sNTrace(ths, "do leader transfer, index:%" PRId64, pEntry->index); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); @@ -2271,10 +2270,11 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta); } - syncLeaderTransferDestroy(pSyncLeaderTransfer); return 0; } +#endif + int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { for (int32_t i = 0; i < pNewCfg->replicaNum; ++i) { SRaftId raftId; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index c7cd0a6249..2b5b205f56 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -258,137 +258,21 @@ int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) { return 0; } -// --------------------------------------------- -SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId) { - uint32_t bytes = sizeof(SyncLeaderTransfer); - SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); - memset(pMsg, 0, bytes); - pMsg->bytes = bytes; - pMsg->vgId = vgId; +int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) { + int32_t bytes = sizeof(SyncLeaderTransfer); + pMsg->pCont = rpcMallocCont(bytes); pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER; - return pMsg; -} - -void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) { - if (pMsg != NULL) { - taosMemoryFree(pMsg); - } -} - -void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) { - ASSERT(pMsg->bytes <= bufLen); - memcpy(buf, pMsg, pMsg->bytes); -} - -void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) { - memcpy(pMsg, buf, len); - ASSERT(len == pMsg->bytes); -} - -char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) { - char* buf = taosMemoryMalloc(pMsg->bytes); - ASSERT(buf != NULL); - syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes); - if (len != NULL) { - *len = pMsg->bytes; - } - return buf; -} - -SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) { - uint32_t bytes = *((uint32_t*)buf); - SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); - ASSERT(pMsg != NULL); - syncLeaderTransferDeserialize(buf, len, pMsg); - ASSERT(len == pMsg->bytes); - return pMsg; -} - -void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg) { - memset(pRpcMsg, 0, sizeof(*pRpcMsg)); - pRpcMsg->msgType = pMsg->msgType; - pRpcMsg->contLen = pMsg->bytes; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - syncLeaderTransferSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); -} - -void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg) { - syncLeaderTransferDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); -} - -SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) { - SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - ASSERT(pMsg != NULL); - return pMsg; -} - -cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg) { - char u64buf[128]; - cJSON* pRoot = cJSON_CreateObject(); - - if (pMsg != NULL) { - cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); - cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); - cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); - - /* - cJSON* pSrcId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); - cJSON_AddStringToObject(pSrcId, "addr", u64buf); - { - uint64_t u64 = pMsg->srcId.addr; - cJSON* pTmp = pSrcId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); - cJSON_AddItemToObject(pRoot, "srcId", pSrcId); - - cJSON* pDestId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); - cJSON_AddStringToObject(pDestId, "addr", u64buf); - { - uint64_t u64 = pMsg->destId.addr; - cJSON* pTmp = pDestId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); - cJSON_AddItemToObject(pRoot, "destId", pDestId); - */ - - cJSON* pNewerId = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->newLeaderId.addr); - cJSON_AddStringToObject(pNewerId, "addr", u64buf); - { - uint64_t u64 = pMsg->newLeaderId.addr; - cJSON* pTmp = pNewerId; - char host[128]; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pTmp, "addr_host", host); - cJSON_AddNumberToObject(pTmp, "addr_port", port); - } - cJSON_AddNumberToObject(pNewerId, "vgId", pMsg->newLeaderId.vgId); - cJSON_AddItemToObject(pRoot, "newLeaderId", pNewerId); + pMsg->contLen = bytes; + if (pMsg->pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncLeaderTransfer", pRoot); - return pJson; -} - -char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) { - cJSON* pJson = syncLeaderTransfer2Json(pMsg); - char* serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; + SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont; + pLeaderTransfer->bytes = bytes; + pLeaderTransfer->msgType = TDMT_SYNC_LEADER_TRANSFER; + pLeaderTransfer->vgId = vgId; + return 0; } const char* syncLocalCmdGetStr(int32_t cmd) { diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index 895b566e23..4784587f4a 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -445,7 +445,18 @@ void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg); void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg); void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg); -// --------------------------------------------- +SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId); +void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg); +void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen); +void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg); +char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len); +SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len); +void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg); +void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg); +SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg); +char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c index c1c1f48ee1..5a32e809bf 100644 --- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c +++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c @@ -2620,3 +2620,136 @@ void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) { taosMemoryFree(serialized); } } + +// --------------------------------------------- +SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncLeaderTransfer); + SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER; + return pMsg; +} + +void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncLeaderTransferDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncLeaderTransferSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg) { + syncLeaderTransferDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + /* + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + */ + + cJSON* pNewerId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->newLeaderId.addr); + cJSON_AddStringToObject(pNewerId, "addr", u64buf); + { + uint64_t u64 = pMsg->newLeaderId.addr; + cJSON* pTmp = pNewerId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pNewerId, "vgId", pMsg->newLeaderId.vgId); + cJSON_AddItemToObject(pRoot, "newLeaderId", pNewerId); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncLeaderTransfer", pRoot); + return pJson; +} + +char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) { + cJSON* pJson = syncLeaderTransfer2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +}