enh: adjust request vote msg
This commit is contained in:
parent
26cb3c3856
commit
8cca68f29d
|
@ -34,9 +34,7 @@ extern "C" {
|
||||||
// mdest |-> j])
|
// mdest |-> j])
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
|
|
||||||
int32_t syncNodeElect(SSyncNode* pSyncNode);
|
int32_t syncNodeElect(SSyncNode* pNode);
|
||||||
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
|
|
||||||
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -324,7 +324,6 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
|
||||||
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
|
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s);
|
||||||
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
|
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
|
||||||
|
|
||||||
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
|
|
||||||
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
|
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
|
||||||
|
|
||||||
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
|
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
|
||||||
|
|
|
@ -28,8 +28,6 @@ typedef enum ESyncTimeoutType {
|
||||||
SYNC_TIMEOUT_HEARTBEAT,
|
SYNC_TIMEOUT_HEARTBEAT,
|
||||||
} ESyncTimeoutType;
|
} ESyncTimeoutType;
|
||||||
|
|
||||||
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
|
|
||||||
|
|
||||||
typedef struct SyncTimeout {
|
typedef struct SyncTimeout {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -40,9 +38,6 @@ typedef struct SyncTimeout {
|
||||||
void* data; // need optimized
|
void* data; // need optimized
|
||||||
} SyncTimeout;
|
} SyncTimeout;
|
||||||
|
|
||||||
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
|
|
||||||
SSyncNode* pNode);
|
|
||||||
|
|
||||||
typedef struct SyncClientRequest {
|
typedef struct SyncClientRequest {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -54,10 +49,6 @@ typedef struct SyncClientRequest {
|
||||||
char data[]; // origin RpcMsg.pCont
|
char data[]; // origin RpcMsg.pCont
|
||||||
} SyncClientRequest;
|
} SyncClientRequest;
|
||||||
|
|
||||||
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
|
|
||||||
bool isWeak, int32_t vgId);
|
|
||||||
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
|
|
||||||
|
|
||||||
typedef struct SyncClientRequestReply {
|
typedef struct SyncClientRequestReply {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -78,25 +69,6 @@ typedef struct SyncRequestVote {
|
||||||
SyncTerm lastLogTerm;
|
SyncTerm lastLogTerm;
|
||||||
} SyncRequestVote;
|
} SyncRequestVote;
|
||||||
|
|
||||||
SyncRequestVote* syncRequestVoteBuild(int32_t vgId);
|
|
||||||
void syncRequestVoteDestroy(SyncRequestVote* pMsg);
|
|
||||||
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
|
|
||||||
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
|
|
||||||
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
|
|
||||||
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
|
|
||||||
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
|
|
||||||
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
|
|
||||||
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
|
||||||
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
|
|
||||||
char* syncRequestVote2Str(const SyncRequestVote* pMsg);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncRequestVotePrint(const SyncRequestVote* pMsg);
|
|
||||||
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
|
|
||||||
void syncRequestVoteLog(const SyncRequestVote* pMsg);
|
|
||||||
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
|
|
||||||
|
|
||||||
// ---------------------------------------------
|
|
||||||
typedef struct SyncRequestVoteReply {
|
typedef struct SyncRequestVoteReply {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -505,8 +477,7 @@ void syncLocalCmdLog(const SyncLocalCmd* pMsg);
|
||||||
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
|
void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
|
||||||
|
|
||||||
// on message ----------------------
|
// on message ----------------------
|
||||||
|
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
|
|
||||||
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
|
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
|
@ -530,6 +501,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
|
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
|
||||||
|
|
||||||
|
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
|
||||||
|
SSyncNode* pNode);
|
||||||
|
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
|
||||||
|
bool isWeak, int32_t vgId);
|
||||||
|
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
|
||||||
|
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -44,7 +44,7 @@ extern "C" {
|
||||||
// m)
|
// m)
|
||||||
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
|
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,41 @@
|
||||||
// mdest |-> j])
|
// mdest |-> j])
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
|
|
||||||
|
static void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const char* s) {
|
||||||
|
char host[64];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
||||||
|
sNTrace(pNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s", host,
|
||||||
|
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
|
||||||
|
if (pNode->state != TAOS_SYNC_STATE_CANDIDATE) {
|
||||||
|
sNTrace(pNode, "not candidate, stop elect");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
for (int i = 0; i < pNode->peersNum; ++i) {
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
ret = syncBuildRequestVote(&rpcMsg, pNode->vgId);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
SyncRequestVote* pMsg = rpcMsg.pCont;
|
||||||
|
pMsg->srcId = pNode->myRaftId;
|
||||||
|
pMsg->destId = pNode->peersId[i];
|
||||||
|
pMsg->term = pNode->pRaftStore->currentTerm;
|
||||||
|
|
||||||
|
ret = syncNodeGetLastIndexTerm(pNode, &pMsg->lastLogIndex, &pMsg->lastLogTerm);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
ret = syncNodeSendMsgById(&pNode->peersId[i], pNode, &rpcMsg);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
sNTrace(pSyncNode, "begin election");
|
sNTrace(pSyncNode, "begin election");
|
||||||
|
|
||||||
|
@ -81,36 +116,3 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
|
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
|
|
||||||
sNTrace(pSyncNode, "not candidate, stop elect");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ret = 0;
|
|
||||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
|
||||||
SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId);
|
|
||||||
pMsg->srcId = pSyncNode->myRaftId;
|
|
||||||
pMsg->destId = pSyncNode->peersId[i];
|
|
||||||
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
|
||||||
|
|
||||||
ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm));
|
|
||||||
ASSERT(ret == 0);
|
|
||||||
|
|
||||||
ret = syncNodeSendRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
|
|
||||||
ASSERT(ret == 0);
|
|
||||||
syncRequestVoteDestroy(pMsg);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
|
|
||||||
int32_t ret = 0;
|
|
||||||
syncLogSendRequestVote(pSyncNode, pMsg, "");
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
|
|
||||||
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
|
||||||
return ret;
|
|
||||||
}
|
|
|
@ -146,9 +146,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||||
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
|
code = syncNodeOnClientRequest(pSyncNode, pMsg, NULL);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVote* pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
syncNodeOnRequestVote(pSyncNode, pMsg);
|
||||||
code = syncNodeOnRequestVote(pSyncNode, pSyncMsg);
|
|
||||||
syncRequestVoteDestroy(pSyncMsg);
|
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
SyncRequestVoteReply* pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
|
code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg);
|
||||||
|
@ -2535,14 +2533,6 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char*
|
||||||
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
|
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
|
|
||||||
char host[64];
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
|
|
||||||
sNTrace(pSyncNode, "send sync-request-vote to %s:%d {term:%" PRId64 ", lindex:%" PRId64 ", lterm:%" PRId64 "}, %s",
|
|
||||||
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
|
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
char host[64];
|
char host[64];
|
||||||
|
|
|
@ -88,155 +88,21 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncRequestVote----
|
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
int32_t bytes = sizeof(SyncRequestVote);
|
||||||
uint32_t bytes = sizeof(SyncRequestVote);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
memset(pMsg, 0, bytes);
|
|
||||||
pMsg->bytes = bytes;
|
|
||||||
pMsg->vgId = vgId;
|
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||||
return pMsg;
|
pMsg->contLen = bytes;
|
||||||
}
|
if (pMsg->pCont == NULL) {
|
||||||
|
terrno = TDMT_SYNC_REQUEST_VOTE;
|
||||||
void syncRequestVoteDestroy(SyncRequestVote* pMsg) {
|
return -1;
|
||||||
if (pMsg != NULL) {
|
|
||||||
taosMemoryFree(pMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) {
|
|
||||||
ASSERT(pMsg->bytes <= bufLen);
|
|
||||||
memcpy(buf, pMsg, pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) {
|
|
||||||
memcpy(pMsg, buf, len);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) {
|
|
||||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
syncRequestVoteSerialize(pMsg, buf, pMsg->bytes);
|
|
||||||
if (len != NULL) {
|
|
||||||
*len = pMsg->bytes;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) {
|
|
||||||
uint32_t bytes = *((uint32_t*)buf);
|
|
||||||
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
syncRequestVoteDeserialize(buf, len, pMsg);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) {
|
|
||||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
|
||||||
pRpcMsg->msgType = pMsg->msgType;
|
|
||||||
pRpcMsg->contLen = pMsg->bytes;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) {
|
|
||||||
syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|
||||||
SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
|
||||||
char u64buf[128] = {0};
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
|
||||||
cJSON* pTmp = pSrcId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
|
||||||
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
|
||||||
cJSON* pTmp = pDestId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastLogIndex);
|
|
||||||
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm);
|
|
||||||
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
SyncRequestVote* pRequestVote = pMsg->pCont;
|
||||||
cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot);
|
pRequestVote->bytes = bytes;
|
||||||
return pJson;
|
pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||||
}
|
pRequestVote->vgId = vgId;
|
||||||
|
return 0;
|
||||||
char* syncRequestVote2Str(const SyncRequestVote* pMsg) {
|
|
||||||
cJSON* pJson = syncRequestVote2Json(pMsg);
|
|
||||||
char* serialized = cJSON_Print(pJson);
|
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncRequestVotePrint(const SyncRequestVote* pMsg) {
|
|
||||||
char* serialized = syncRequestVote2Str(pMsg);
|
|
||||||
printf("syncRequestVotePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) {
|
|
||||||
char* serialized = syncRequestVote2Str(pMsg);
|
|
||||||
printf("syncRequestVotePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVoteLog(const SyncRequestVote* pMsg) {
|
|
||||||
char* serialized = syncRequestVote2Str(pMsg);
|
|
||||||
sTrace("syncRequestVoteLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
char* serialized = syncRequestVote2Str(pMsg);
|
|
||||||
sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncRequestVoteReply----
|
// ---- message process SyncRequestVoteReply----
|
||||||
|
|
|
@ -88,8 +88,9 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) {
|
int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
SyncRequestVote* pMsg = pRpcMsg->pCont;
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
|
|
|
@ -241,6 +241,24 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMs
|
||||||
void syncClientRequestLog(const SyncClientRequest* pMsg);
|
void syncClientRequestLog(const SyncClientRequest* pMsg);
|
||||||
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
|
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
|
||||||
|
|
||||||
|
SyncRequestVote* syncRequestVoteBuild(int32_t vgId);
|
||||||
|
void syncRequestVoteDestroy(SyncRequestVote* pMsg);
|
||||||
|
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
|
||||||
|
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
|
||||||
|
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
|
||||||
|
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
|
||||||
|
char* syncRequestVote2Str(const SyncRequestVote* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncRequestVotePrint(const SyncRequestVote* pMsg);
|
||||||
|
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
|
||||||
|
void syncRequestVoteLog(const SyncRequestVote* pMsg);
|
||||||
|
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1009,3 +1009,154 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- message process SyncRequestVote----
|
||||||
|
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncRequestVote);
|
||||||
|
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVoteDestroy(SyncRequestVote* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
ASSERT(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
syncRequestVoteSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncRequestVote* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
syncRequestVoteDeserialize(buf, len, pMsg);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncRequestVoteSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) {
|
||||||
|
syncRequestVoteDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastLogIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->lastLogTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncRequestVote", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncRequestVote2Str(const SyncRequestVote* pMsg) {
|
||||||
|
cJSON* pJson = syncRequestVote2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncRequestVotePrint(const SyncRequestVote* pMsg) {
|
||||||
|
char* serialized = syncRequestVote2Str(pMsg);
|
||||||
|
printf("syncRequestVotePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg) {
|
||||||
|
char* serialized = syncRequestVote2Str(pMsg);
|
||||||
|
printf("syncRequestVotePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVoteLog(const SyncRequestVote* pMsg) {
|
||||||
|
char* serialized = syncRequestVote2Str(pMsg);
|
||||||
|
sTrace("syncRequestVoteLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncRequestVote2Str(pMsg);
|
||||||
|
sTrace("syncRequestVoteLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue