refact: adjust sync snapshotsend rsp
This commit is contained in:
parent
eec0ab8c8d
commit
fd7a499946
|
@ -207,25 +207,6 @@ typedef struct SyncSnapshotRsp {
|
|||
SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
|
||||
} SyncSnapshotRsp;
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId);
|
||||
void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg);
|
||||
char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len);
|
||||
SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len);
|
||||
void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg);
|
||||
SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg);
|
||||
char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncLeaderTransfer {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
|
@ -294,7 +275,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
|||
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
|
||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||
|
@ -323,6 +304,7 @@ int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId);
|
|||
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId);
|
||||
int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId, SFsmCbMeta* pMeta);
|
||||
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
||||
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive
|
|||
|
||||
// on message
|
||||
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg);
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, const SRpcMsg *pMsg);
|
||||
|
||||
// start
|
||||
|
||||
|
|
|
@ -152,9 +152,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
|||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
||||
code = syncNodeOnSnapshot(pSyncNode, pMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
|
||||
SyncSnapshotRsp* pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
|
||||
syncSnapshotRspDestroy(pSyncMsg);
|
||||
code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
|
||||
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||
SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
||||
|
|
|
@ -241,166 +241,21 @@ int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncSnapshotRsp);
|
||||
SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) {
|
||||
int32_t bytes = sizeof(SyncSnapshotRsp);
|
||||
pMsg->pCont = rpcMallocCont(bytes);
|
||||
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncSnapshotRspSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncSnapshotRspDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncSnapshotRspSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg) {
|
||||
syncSnapshotRspDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncSnapshotRsp* pMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
|
||||
char u64buf[128];
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
|
||||
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastIndex);
|
||||
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastTerm);
|
||||
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack);
|
||||
cJSON_AddNumberToObject(pRoot, "code", pMsg->code);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapBeginIndex);
|
||||
cJSON_AddStringToObject(pRoot, "snap-begin", u64buf);
|
||||
pMsg->contLen = bytes;
|
||||
if (pMsg->pCont == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg) {
|
||||
cJSON* pJson = syncSnapshotRsp2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
printf("syncSnapshotRspPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
printf("syncSnapshotRspPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
sTrace("syncSnapshotRspLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
sTrace("syncSnapshotRspLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
|
||||
pPreSnapshotRsp->bytes = bytes;
|
||||
pPreSnapshotRsp->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
||||
pPreSnapshotRsp->vgId = vgId;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ---------------------------------------------
|
||||
|
|
|
@ -545,7 +545,11 @@ _START_RECEIVER:
|
|||
_SEND_REPLY:
|
||||
// build msg
|
||||
; // make complier happy
|
||||
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
|
||||
|
||||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
@ -557,12 +561,8 @@ _SEND_REPLY:
|
|||
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
|
||||
|
||||
// send msg
|
||||
SRpcMsg rpcMsg;
|
||||
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
||||
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
|
||||
syncSnapshotRspDestroy(pRspMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -584,7 +584,10 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
|
|||
snapshotReceiverStartWriter(pReceiver, pMsg);
|
||||
|
||||
// build msg
|
||||
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
|
||||
|
||||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
@ -596,12 +599,8 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
|
|||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||
|
||||
// send msg
|
||||
SRpcMsg rpcMsg;
|
||||
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
||||
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
|
||||
syncSnapshotRspDestroy(pRspMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -623,7 +622,10 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
|
|||
}
|
||||
|
||||
// build msg
|
||||
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
|
||||
|
||||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
@ -635,12 +637,8 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
|
|||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||
|
||||
// send msg
|
||||
SRpcMsg rpcMsg;
|
||||
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
||||
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
|
||||
syncSnapshotRspDestroy(pRspMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -663,7 +661,10 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
|
|||
}
|
||||
|
||||
// build msg
|
||||
SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId);
|
||||
SRpcMsg rpcMsg = {0};
|
||||
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
|
||||
|
||||
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
|
||||
pRspMsg->srcId = pSyncNode->myRaftId;
|
||||
pRspMsg->destId = pMsg->srcId;
|
||||
pRspMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
|
@ -675,12 +676,8 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
|
|||
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
|
||||
|
||||
// send msg
|
||||
SRpcMsg rpcMsg;
|
||||
syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg);
|
||||
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
|
||||
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
|
||||
syncSnapshotRspDestroy(pRspMsg);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -832,7 +829,9 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
|
|||
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
|
||||
// condition 3 sender receives error msg, just print error log
|
||||
//
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
||||
int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
||||
SyncSnapshotRsp *pMsg = pRpcMsg->pCont;
|
||||
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped");
|
||||
|
|
|
@ -427,6 +427,25 @@ void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg);
|
|||
void syncSnapshotSendLog(const SyncSnapshotSend* pMsg);
|
||||
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg);
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId);
|
||||
void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg);
|
||||
char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len);
|
||||
SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len);
|
||||
void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg);
|
||||
SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg);
|
||||
char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
|
||||
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -2458,3 +2458,165 @@ SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncSnapshotRsp);
|
||||
SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncSnapshotRspSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncSnapshotRspDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncSnapshotRspSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg) {
|
||||
syncSnapshotRspDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncSnapshotRsp* pMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
|
||||
char u64buf[128];
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
|
||||
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastIndex);
|
||||
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastTerm);
|
||||
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack);
|
||||
cJSON_AddNumberToObject(pRoot, "code", pMsg->code);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapBeginIndex);
|
||||
cJSON_AddStringToObject(pRoot, "snap-begin", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg) {
|
||||
cJSON* pJson = syncSnapshotRsp2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
printf("syncSnapshotRspPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
printf("syncSnapshotRspPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
sTrace("syncSnapshotRspLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||
sTrace("syncSnapshotRspLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue