refact: build sync heartbeat msg
This commit is contained in:
parent
94076e5919
commit
55927eaf0d
|
@ -125,28 +125,8 @@ typedef struct SyncHeartbeat {
|
|||
SyncIndex commitIndex;
|
||||
SyncTerm privateTerm;
|
||||
SyncTerm minMatchIndex;
|
||||
|
||||
} SyncHeartbeat;
|
||||
|
||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
|
||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
|
||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
|
||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
|
||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
|
||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
|
||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncHeartbeatReply {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
|
@ -420,7 +400,7 @@ int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
|
|||
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
|
||||
|
||||
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
||||
|
@ -441,6 +421,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
|
|||
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
||||
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ extern "C" {
|
|||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||
|
||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncHeartbeat* pMsg);
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
||||
|
||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
|
||||
|
|
|
@ -134,9 +134,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
|||
if (pSyncNode == NULL) return code;
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||
SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||
SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||
|
@ -1192,6 +1190,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
|
|||
pSyncNode->syncSendMSg(&epSet, pMsg);
|
||||
} else {
|
||||
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1913,7 +1912,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
|
||||
if (pSyncNode->replicaNum > 1) {
|
||||
if (timerLogicClock == msgLogicClock) {
|
||||
SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||
|
||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||
pSyncMsg->destId = pData->destId;
|
||||
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
@ -1921,28 +1923,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
|||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
|
||||
// eq msg
|
||||
#if 0
|
||||
if (pSyncNode->syncEqCtrlMsg != NULL) {
|
||||
int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
|
||||
}
|
||||
#endif
|
||||
|
||||
// send msg
|
||||
syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
|
||||
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
|
||||
if (syncIsInit()) {
|
||||
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
|
||||
|
@ -2024,7 +2006,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeat* pMsg = pRpcMsg->pCont;
|
||||
syncLogRecvHeartbeat(ths, pMsg, "");
|
||||
|
||||
SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
|
||||
|
|
|
@ -155,158 +155,23 @@ int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// ---- message process SyncHeartbeat----
|
||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncHeartbeat);
|
||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
|
||||
int32_t bytes = sizeof(SyncHeartbeat);
|
||||
pMsg->pCont = rpcMallocCont(bytes);
|
||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||
return pMsg;
|
||||
pMsg->contLen = bytes;
|
||||
if (pMsg->pCont == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
SyncHeartbeat* pHeartbeat = pMsg->pCont;
|
||||
pHeartbeat->bytes = bytes;
|
||||
pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
|
||||
pHeartbeat->vgId = vgId;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncHeartbeatSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncHeartbeatDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) {
|
||||
syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) {
|
||||
cJSON* pJson = syncHeartbeat2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
printf("syncHeartbeatPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
printf("syncHeartbeatPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
sTrace("syncHeartbeatLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
sTrace("syncHeartbeatLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncHeartbeatReply----
|
||||
SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId) {
|
||||
|
|
|
@ -186,19 +186,19 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) {
|
||||
int32_t ret = 0;
|
||||
syncLogSendHeartbeat(pSyncNode, pMsg, "");
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg);
|
||||
return ret;
|
||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, "");
|
||||
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||
}
|
||||
|
||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||
pSyncMsg->destId = pSyncNode->peersId[i];
|
||||
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
@ -206,13 +206,8 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
|||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||
pSyncMsg->privateTerm = 0;
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
|
||||
// send msg
|
||||
syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
|
||||
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -313,6 +313,24 @@ void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
|
|||
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
|
||||
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
|
||||
|
||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
|
||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
|
||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
|
||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
|
||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
|
||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
|
||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1635,3 +1635,156 @@ void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
|
|||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncHeartbeat----
|
||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncHeartbeat);
|
||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncHeartbeatSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncHeartbeatDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) {
|
||||
syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) {
|
||||
cJSON* pJson = syncHeartbeat2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
printf("syncHeartbeatPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
printf("syncHeartbeatPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
sTrace("syncHeartbeatLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
sTrace("syncHeartbeatLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue