refact: sync local cmd msg
This commit is contained in:
parent
d37d83ee26
commit
f2191363b4
|
@ -224,8 +224,6 @@ typedef enum {
|
||||||
SYNC_LOCAL_CMD_FOLLOWER_CMT,
|
SYNC_LOCAL_CMD_FOLLOWER_CMT,
|
||||||
} ESyncLocalCmd;
|
} ESyncLocalCmd;
|
||||||
|
|
||||||
const char* syncLocalCmdGetStr(int32_t cmd);
|
|
||||||
|
|
||||||
typedef struct SyncLocalCmd {
|
typedef struct SyncLocalCmd {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -236,27 +234,8 @@ typedef struct SyncLocalCmd {
|
||||||
int32_t cmd;
|
int32_t cmd;
|
||||||
SyncTerm sdNewTerm; // step down new term
|
SyncTerm sdNewTerm; // step down new term
|
||||||
SyncIndex fcIndex; // follower commit index
|
SyncIndex fcIndex; // follower commit index
|
||||||
|
|
||||||
} SyncLocalCmd;
|
} SyncLocalCmd;
|
||||||
|
|
||||||
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
|
|
||||||
void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
|
|
||||||
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
|
|
||||||
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
|
|
||||||
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
|
|
||||||
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
|
|
||||||
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
|
|
||||||
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
|
|
||||||
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
|
||||||
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
|
|
||||||
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
|
|
||||||
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
|
|
||||||
void syncLocalCmdLog(const SyncLocalCmd* pMsg);
|
|
||||||
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
|
|
||||||
|
|
||||||
// on message ----------------------
|
// on message ----------------------
|
||||||
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
|
@ -264,20 +243,18 @@ int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
|
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
|
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
|
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
||||||
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
|
|
||||||
|
|
||||||
// -----------------------------------------
|
|
||||||
|
|
||||||
// option ----------------------------------
|
// option ----------------------------------
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
|
const char* syncTimerTypeStr( ESyncTimeoutType timerType);
|
||||||
|
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd);
|
||||||
|
|
||||||
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
|
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
|
||||||
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId);
|
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seq, bool isWeak, int32_t vgId);
|
||||||
|
@ -294,6 +271,7 @@ int32_t syncBuildApplyMsg(SRpcMsg* pMsg, const SRpcMsg* pOriginal, int32_t vgId,
|
||||||
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
||||||
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId);
|
||||||
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,9 +154,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
|
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
|
||||||
code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
|
code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||||
SyncLocalCmd* pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
code = syncNodeOnLocalCmd(pSyncNode, pMsg);
|
||||||
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
|
||||||
syncLocalCmdDestroy(pSyncMsg);
|
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
sError("vgId:%d, failed to process msg:%p since invalid type:%s", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -2017,13 +2015,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
|
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
|
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
|
||||||
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
|
SRpcMsg rpcMsgLocalCmd = {0};
|
||||||
|
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||||
|
|
||||||
|
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
|
||||||
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
|
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
|
||||||
pSyncMsg->fcIndex = pMsg->commitIndex;
|
pSyncMsg->fcIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
SRpcMsg rpcMsgLocalCmd;
|
|
||||||
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
|
|
||||||
|
|
||||||
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
|
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
|
||||||
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
|
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -2038,13 +2036,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
// syncNodeStepDown(ths, pMsg->term);
|
// syncNodeStepDown(ths, pMsg->term);
|
||||||
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
|
SRpcMsg rpcMsgLocalCmd = {0};
|
||||||
|
(void)syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId);
|
||||||
|
|
||||||
|
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
|
||||||
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
|
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
|
||||||
pSyncMsg->sdNewTerm = pMsg->term;
|
pSyncMsg->sdNewTerm = pMsg->term;
|
||||||
|
|
||||||
SRpcMsg rpcMsgLocalCmd;
|
|
||||||
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
|
|
||||||
|
|
||||||
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
|
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
|
||||||
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
|
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -2054,8 +2052,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
|
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
syncLocalCmdDestroy(pSyncMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2079,7 +2075,8 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
|
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncLocalCmd* pMsg = pRpcMsg->pCont;
|
||||||
syncLogRecvLocalCmd(ths, pMsg, "");
|
syncLogRecvLocalCmd(ths, pMsg, "");
|
||||||
|
|
||||||
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
||||||
|
@ -2487,18 +2484,6 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
|
|
||||||
if (timerType == SYNC_TIMEOUT_PING) {
|
|
||||||
return "ping";
|
|
||||||
} else if (timerType == SYNC_TIMEOUT_ELECTION) {
|
|
||||||
return "elect";
|
|
||||||
} else if (timerType == SYNC_TIMEOUT_HEARTBEAT) {
|
|
||||||
return "heartbeat";
|
|
||||||
} else {
|
|
||||||
return "unknown";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
|
||||||
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
|
sNTrace(pSyncNode, "recv sync-timer {type:%s, lc:%" PRId64 ", ms:%d, data:%p}, %s",
|
||||||
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
|
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
|
||||||
|
|
|
@ -275,164 +275,43 @@ int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
const char* syncLocalCmdGetStr(int32_t cmd) {
|
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
int32_t bytes = sizeof(SyncLocalCmd);
|
||||||
return "step-down";
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
} else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
|
|
||||||
return "follower-commit";
|
|
||||||
}
|
|
||||||
|
|
||||||
return "unknown-local-cmd";
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
|
|
||||||
uint32_t bytes = sizeof(SyncLocalCmd);
|
|
||||||
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
memset(pMsg, 0, bytes);
|
|
||||||
pMsg->bytes = bytes;
|
|
||||||
pMsg->vgId = vgId;
|
|
||||||
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
||||||
return pMsg;
|
pMsg->contLen = bytes;
|
||||||
|
if (pMsg->pCont == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLocalCmd* pLocalCmd = pMsg->pCont;
|
||||||
|
pLocalCmd->bytes = bytes;
|
||||||
|
pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
|
||||||
|
pLocalCmd->vgId = vgId;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLocalCmdDestroy(SyncLocalCmd* pMsg) {
|
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
|
||||||
if (pMsg != NULL) {
|
switch (timerType) {
|
||||||
taosMemoryFree(pMsg);
|
case SYNC_TIMEOUT_PING:
|
||||||
|
return "ping";
|
||||||
|
case SYNC_TIMEOUT_ELECTION:
|
||||||
|
return "elect";
|
||||||
|
case SYNC_TIMEOUT_HEARTBEAT:
|
||||||
|
return "heartbeat";
|
||||||
|
default:
|
||||||
|
return "unknown";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) {
|
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
|
||||||
ASSERT(pMsg->bytes <= bufLen);
|
switch (cmd) {
|
||||||
memcpy(buf, pMsg, pMsg->bytes);
|
case SYNC_LOCAL_CMD_STEP_DOWN:
|
||||||
}
|
return "step-down";
|
||||||
|
case SYNC_LOCAL_CMD_FOLLOWER_CMT:
|
||||||
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) {
|
return "follower-commit";
|
||||||
memcpy(pMsg, buf, len);
|
default:
|
||||||
ASSERT(len == pMsg->bytes);
|
return "unknown-local-cmd";
|
||||||
}
|
|
||||||
|
|
||||||
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) {
|
|
||||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
syncLocalCmdSerialize(pMsg, buf, pMsg->bytes);
|
|
||||||
if (len != NULL) {
|
|
||||||
*len = pMsg->bytes;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) {
|
|
||||||
uint32_t bytes = *((uint32_t*)buf);
|
|
||||||
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
syncLocalCmdDeserialize(buf, len, pMsg);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) {
|
|
||||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
|
||||||
pRpcMsg->msgType = pMsg->msgType;
|
|
||||||
pRpcMsg->contLen = pMsg->bytes;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) {
|
|
||||||
syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|
||||||
SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
|
|
||||||
char u64buf[128];
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
|
||||||
cJSON* pTmp = pSrcId;
|
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
|
||||||
cJSON* pTmp = pDestId;
|
|
||||||
char host[128];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
|
||||||
|
|
||||||
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
|
|
||||||
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
|
|
||||||
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
|
||||||
cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot);
|
|
||||||
return pJson;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) {
|
|
||||||
cJSON* pJson = syncLocalCmd2Json(pMsg);
|
|
||||||
char* serialized = cJSON_Print(pJson);
|
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncLocalCmdPrint(const SyncLocalCmd* pMsg) {
|
|
||||||
char* serialized = syncLocalCmd2Str(pMsg);
|
|
||||||
printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) {
|
|
||||||
char* serialized = syncLocalCmd2Str(pMsg);
|
|
||||||
printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLocalCmdLog(const SyncLocalCmd* pMsg) {
|
|
||||||
char* serialized = syncLocalCmd2Str(pMsg);
|
|
||||||
sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
char* serialized = syncLocalCmd2Str(pMsg);
|
|
||||||
sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -457,6 +457,24 @@ SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
|
cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
|
||||||
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
|
char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
|
||||||
|
|
||||||
|
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
|
||||||
|
void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
|
||||||
|
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
|
||||||
|
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
|
||||||
|
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
|
||||||
|
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
|
||||||
|
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
|
||||||
|
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
|
||||||
|
void syncLocalCmdLog(const SyncLocalCmd* pMsg);
|
||||||
|
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -2753,3 +2753,155 @@ char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg) {
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncLocalCmd);
|
||||||
|
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdDestroy(SyncLocalCmd* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
ASSERT(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
syncLocalCmdSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
syncLocalCmdDeserialize(buf, len, pMsg);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) {
|
||||||
|
syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
|
||||||
|
char u64buf[128];
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) {
|
||||||
|
cJSON* pJson = syncLocalCmd2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncLocalCmdPrint(const SyncLocalCmd* pMsg) {
|
||||||
|
char* serialized = syncLocalCmd2Str(pMsg);
|
||||||
|
printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) {
|
||||||
|
char* serialized = syncLocalCmd2Str(pMsg);
|
||||||
|
printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdLog(const SyncLocalCmd* pMsg) {
|
||||||
|
char* serialized = syncLocalCmd2Str(pMsg);
|
||||||
|
sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncLocalCmd2Str(pMsg);
|
||||||
|
sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue