refactor(sync): add SyncLocalCmd
This commit is contained in:
parent
bbb624b63e
commit
8b5d005db7
|
@ -269,6 +269,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL)
|
||||
|
||||
#if defined(TD_MSG_NUMBER_)
|
||||
|
|
|
@ -679,6 +679,10 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
|
|||
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef enum {
|
||||
SYNC_STEP_DOWN = 0,
|
||||
} ESyncLocalCmd;
|
||||
|
||||
typedef struct SyncLocalCmd {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
|
@ -686,8 +690,8 @@ typedef struct SyncLocalCmd {
|
|||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
|
||||
int8_t cmd;
|
||||
SyncTerm sdNewTerm // step down new term
|
||||
int32_t cmd;
|
||||
SyncTerm sdNewTerm; // step down new term
|
||||
|
||||
} SyncLocalCmd;
|
||||
|
||||
|
|
|
@ -3095,3 +3095,153 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
|
|||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------
|
||||
SyncLocalCmd* syncLocalCmdBuild(uint32_t dataLen, int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncLocalCmd);
|
||||
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncLocalCmdDestroy(SyncLocalCmd* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncLocalCmdSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncLocalCmdDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) {
|
||||
syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
|
||||
char u64buf[128];
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
|
||||
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) {
|
||||
cJSON* pJson = syncLocalCmd2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncLocalCmdPrint(const SyncLocalCmd* pMsg) {
|
||||
char* serialized = syncLocalCmd2Str(pMsg);
|
||||
printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) {
|
||||
char* serialized = syncLocalCmd2Str(pMsg);
|
||||
printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncLocalCmdLog(const SyncLocalCmd* pMsg) {
|
||||
char* serialized = syncLocalCmd2Str(pMsg);
|
||||
sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncLocalCmd2Str(pMsg);
|
||||
sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue