enh(sync): add SyncSnapshotRsp SyncSnapshotSend
This commit is contained in:
parent
6be4119fe7
commit
c5526ef915
|
@ -221,6 +221,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_SNAPSHOT_SEND, "vnode-sync-snapshot-send", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_SNAPSHOT_RSP, "vnode-sync-snapshot-rsp", NULL, NULL)
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)
|
||||||
|
|
|
@ -385,6 +385,74 @@ void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
|
||||||
void syncApplyMsgLog(const SyncApplyMsg* pMsg);
|
void syncApplyMsgLog(const SyncApplyMsg* pMsg);
|
||||||
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
|
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
typedef struct SyncSnapshotSend {
|
||||||
|
uint32_t bytes;
|
||||||
|
int32_t vgId;
|
||||||
|
uint32_t msgType;
|
||||||
|
SRaftId srcId;
|
||||||
|
SRaftId destId;
|
||||||
|
|
||||||
|
SyncTerm term;
|
||||||
|
SyncIndex lastIndex;
|
||||||
|
SyncTerm lastTerm;
|
||||||
|
int32_t seq;
|
||||||
|
int32_t ack;
|
||||||
|
uint32_t dataLen;
|
||||||
|
char data[];
|
||||||
|
} SyncSnapshotSend;
|
||||||
|
|
||||||
|
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId);
|
||||||
|
void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg);
|
||||||
|
void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg);
|
||||||
|
char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len);
|
||||||
|
SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg);
|
||||||
|
SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg);
|
||||||
|
char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg);
|
||||||
|
void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg);
|
||||||
|
void syncSnapshotSendLog(const SyncSnapshotSend* pMsg);
|
||||||
|
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg);
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
typedef struct SyncSnapshotRsp {
|
||||||
|
uint32_t bytes;
|
||||||
|
int32_t vgId;
|
||||||
|
uint32_t msgType;
|
||||||
|
SRaftId srcId;
|
||||||
|
SRaftId destId;
|
||||||
|
|
||||||
|
SyncTerm term;
|
||||||
|
SyncIndex lastIndex;
|
||||||
|
SyncTerm lastTerm;
|
||||||
|
int32_t seq;
|
||||||
|
int32_t ack;
|
||||||
|
} SyncSnapshotRsp;
|
||||||
|
|
||||||
|
SyncSnapshotRsp* syncSnapshotRspBuild(uint32_t dataLen, int32_t vgId);
|
||||||
|
void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg);
|
||||||
|
void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg);
|
||||||
|
char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len);
|
||||||
|
SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg);
|
||||||
|
SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg);
|
||||||
|
char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg);
|
||||||
|
void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
|
||||||
|
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
|
||||||
|
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
// on message ----------------------
|
// on message ----------------------
|
||||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||||
|
|
|
@ -230,6 +230,8 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
|
|
@ -1690,3 +1690,324 @@ void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {
|
||||||
sTrace("syncApplyMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncApplyMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
|
||||||
|
SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_VND_SYNC_SNAPSHOT_SEND;
|
||||||
|
pMsg->dataLen = dataLen;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
assert(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
assert(len == pMsg->bytes);
|
||||||
|
assert(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
assert(buf != NULL);
|
||||||
|
syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
assert(pMsg != NULL);
|
||||||
|
syncSnapshotSendDeserialize(buf, len, pMsg);
|
||||||
|
assert(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncSnapshotSendSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) {
|
||||||
|
syncSnapshotSendDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
assert(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "seq", pMsg->seq);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack);
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||||
|
char* s;
|
||||||
|
s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen);
|
||||||
|
cJSON_AddStringToObject(pRoot, "data", s);
|
||||||
|
taosMemoryFree(s);
|
||||||
|
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||||
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
|
taosMemoryFree(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncSnapshotSend", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg) {
|
||||||
|
cJSON* pJson = syncSnapshotSend2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg) {
|
||||||
|
char* serialized = syncSnapshotSend2Str(pMsg);
|
||||||
|
printf("syncSnapshotSendPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg) {
|
||||||
|
char* serialized = syncSnapshotSend2Str(pMsg);
|
||||||
|
printf("syncSnapshotSendPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendLog(const SyncSnapshotSend* pMsg) {
|
||||||
|
char* serialized = syncSnapshotSend2Str(pMsg);
|
||||||
|
sTrace("syncSnapshotSendLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) {
|
||||||
|
char* serialized = syncSnapshotSend2Str(pMsg);
|
||||||
|
sTrace("syncSnapshotSendLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------
|
||||||
|
SyncSnapshotRsp* syncSnapshotRspBuild(uint32_t dataLen, int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncSnapshotRsp);
|
||||||
|
SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_VND_SYNC_SNAPSHOT_RSP;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
assert(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
assert(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
assert(buf != NULL);
|
||||||
|
syncSnapshotRspSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
assert(pMsg != NULL);
|
||||||
|
syncSnapshotRspDeserialize(buf, len, pMsg);
|
||||||
|
assert(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncSnapshotRspSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg) {
|
||||||
|
syncSnapshotRspDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncSnapshotRsp* pMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
assert(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastTerm", u64buf);
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "seq", pMsg->seq);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg) {
|
||||||
|
cJSON* pJson = syncSnapshotRsp2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg) {
|
||||||
|
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||||
|
printf("syncSnapshotRspPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||||
|
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||||
|
printf("syncSnapshotRspPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg) {
|
||||||
|
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||||
|
sTrace("syncSnapshotRspLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||||
|
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||||
|
sTrace("syncSnapshotRspLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
Loading…
Reference in New Issue