Merge branch '3.0' into fix/TD-20299-D
This commit is contained in:
commit
622c31f15d
|
@ -119,7 +119,7 @@ taos -h tdengine -P 6030
|
||||||
FROM ubuntu:20.04
|
FROM ubuntu:20.04
|
||||||
RUN apt-get update && apt-get install -y wget
|
RUN apt-get update && apt-get install -y wget
|
||||||
ENV TDENGINE_VERSION=3.0.0.0
|
ENV TDENGINE_VERSION=3.0.0.0
|
||||||
RUN wget -c https://www.tdengine.com/assets-download/TDengine-client-${TDENGINE_VERSION}-Linux-x64.tar.gz \
|
RUN wget -c https://www.tdengine.com/assets-download/3.0/TDengine-client-${TDENGINE_VERSION}-Linux-x64.tar.gz \
|
||||||
&& tar xvf TDengine-client-${TDENGINE_VERSION}-Linux-x64.tar.gz \
|
&& tar xvf TDengine-client-${TDENGINE_VERSION}-Linux-x64.tar.gz \
|
||||||
&& cd TDengine-client-${TDENGINE_VERSION} \
|
&& cd TDengine-client-${TDENGINE_VERSION} \
|
||||||
&& ./install_client.sh \
|
&& ./install_client.sh \
|
||||||
|
|
|
@ -35,7 +35,7 @@ extern "C" {
|
||||||
// /\ Discard(m)
|
// /\ Discard(m)
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
|
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
|
||||||
//
|
//
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,24 +112,6 @@ typedef struct SyncAppendEntriesReply {
|
||||||
int64_t startTime;
|
int64_t startTime;
|
||||||
} SyncAppendEntriesReply;
|
} SyncAppendEntriesReply;
|
||||||
|
|
||||||
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
|
|
||||||
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
|
|
||||||
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
|
|
||||||
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
|
|
||||||
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
|
|
||||||
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
|
|
||||||
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
|
|
||||||
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
|
|
||||||
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
|
||||||
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
|
|
||||||
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
|
|
||||||
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
|
|
||||||
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
|
|
||||||
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
|
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
typedef struct SyncHeartbeat {
|
typedef struct SyncHeartbeat {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
|
@ -143,28 +125,8 @@ typedef struct SyncHeartbeat {
|
||||||
SyncIndex commitIndex;
|
SyncIndex commitIndex;
|
||||||
SyncTerm privateTerm;
|
SyncTerm privateTerm;
|
||||||
SyncTerm minMatchIndex;
|
SyncTerm minMatchIndex;
|
||||||
|
|
||||||
} SyncHeartbeat;
|
} SyncHeartbeat;
|
||||||
|
|
||||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
|
|
||||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
|
|
||||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
|
|
||||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
|
|
||||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
|
|
||||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
|
|
||||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
|
|
||||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
|
|
||||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
|
||||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
|
|
||||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
|
|
||||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
|
|
||||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg);
|
|
||||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
|
|
||||||
|
|
||||||
// ---------------------------------------------
|
|
||||||
typedef struct SyncHeartbeatReply {
|
typedef struct SyncHeartbeatReply {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -430,7 +392,7 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
|
||||||
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
|
int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
|
||||||
int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
|
int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
|
||||||
|
@ -438,7 +400,7 @@ int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
|
||||||
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||||
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
|
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
|
||||||
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
|
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
|
||||||
|
@ -452,14 +414,14 @@ ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
|
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
|
||||||
|
|
||||||
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
|
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
|
||||||
SSyncNode* pNode);
|
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginalRpc, uint64_t seq, bool isWeak, int32_t vgId);
|
||||||
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
|
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
|
||||||
bool isWeak, int32_t vgId);
|
|
||||||
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
|
|
||||||
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId);
|
||||||
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
|
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId);
|
||||||
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId);
|
||||||
|
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ extern "C" {
|
||||||
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
|
||||||
|
|
||||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncHeartbeat* pMsg);
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
|
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
|
||||||
|
|
|
@ -138,6 +138,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
|
@ -146,7 +147,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
int32_t code = syncBuildAppendEntriesReply(&rpcRsp, ths->vgId);
|
||||||
|
if (code != 0) {
|
||||||
|
syncLogRecvAppendEntries(ths, pMsg, "build rsp error");
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* pReply = rpcRsp.pCont;
|
||||||
pReply->srcId = ths->myRaftId;
|
pReply->srcId = ths->myRaftId;
|
||||||
pReply->destId = pMsg->srcId;
|
pReply->destId = pMsg->srcId;
|
||||||
pReply->term = ths->pRaftStore->currentTerm;
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
@ -288,7 +295,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
goto _SEND_RESPONSE;
|
goto _SEND_RESPONSE;
|
||||||
|
|
||||||
_IGNORE:
|
_IGNORE:
|
||||||
syncAppendEntriesReplyDestroy(pReply);
|
rpcFreeCont(rpcRsp.pCont);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_SEND_RESPONSE:
|
_SEND_RESPONSE:
|
||||||
|
@ -296,10 +303,6 @@ _SEND_RESPONSE:
|
||||||
syncLogSendAppendEntriesReply(ths, pReply, "");
|
syncLogSendAppendEntriesReply(ths, pReply, "");
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp);
|
||||||
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
|
||||||
syncAppendEntriesReplyDestroy(pReply);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -73,8 +73,9 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
|
||||||
|
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
|
|
|
@ -134,9 +134,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
||||||
if (pSyncNode == NULL) return code;
|
if (pSyncNode == NULL) return code;
|
||||||
|
|
||||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||||
SyncHeartbeat* pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
code = syncNodeOnHeartbeat(pSyncNode, pMsg);
|
||||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
|
||||||
syncHeartbeatDestroy(pSyncMsg);
|
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||||
SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
SyncHeartbeatReply* pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||||
|
@ -152,9 +150,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||||
syncNodeOnAppendEntries(pSyncNode, pMsg);
|
syncNodeOnAppendEntries(pSyncNode, pMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
SyncAppendEntriesReply* pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
syncNodeOnAppendEntriesReply(pSyncNode, pMsg);
|
||||||
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
|
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
||||||
SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
SyncSnapshotSend* pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
|
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
|
||||||
|
@ -603,7 +599,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
|
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
|
||||||
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncClientRequestBuildFromRpcMsg(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
|
int32_t code = syncBuildClientRequest(&rpcMsg, pMsg, seqNum, isWeak, pSyncNode->vgId);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
|
sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr());
|
||||||
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
(void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||||
|
@ -1194,6 +1190,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
|
||||||
pSyncNode->syncSendMSg(&epSet, pMsg);
|
pSyncNode->syncSendMSg(&epSet, pMsg);
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
|
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1796,7 +1793,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
SSyncNode* pNode = param;
|
SSyncNode* pNode = param;
|
||||||
if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
|
if (atomic_load_64(&pNode->pingTimerLogicClockUser) <= atomic_load_64(&pNode->pingTimerLogicClock)) {
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
|
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
|
||||||
pNode->pingTimerMS, pNode);
|
pNode->pingTimerMS, pNode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sNError(pNode, "failed to build ping msg");
|
sNError(pNode, "failed to build ping msg");
|
||||||
|
@ -1826,7 +1823,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
SSyncNode* pNode = pElectTimer->pSyncNode;
|
SSyncNode* pNode = pElectTimer->pSyncNode;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
|
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sNError(pNode, "failed to build elect msg");
|
sNError(pNode, "failed to build elect msg");
|
||||||
|
@ -1865,7 +1862,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
||||||
if (pNode->replicaNum > 1) {
|
if (pNode->replicaNum > 1) {
|
||||||
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
|
if (atomic_load_64(&pNode->heartbeatTimerLogicClockUser) <= atomic_load_64(&pNode->heartbeatTimerLogicClock)) {
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncTimeoutBuild(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
|
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pNode->heartbeatTimerLogicClock),
|
||||||
pNode->heartbeatTimerMS, pNode);
|
pNode->heartbeatTimerMS, pNode);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1915,7 +1912,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
|
|
||||||
if (pSyncNode->replicaNum > 1) {
|
if (pSyncNode->replicaNum > 1) {
|
||||||
if (timerLogicClock == msgLogicClock) {
|
if (timerLogicClock == msgLogicClock) {
|
||||||
SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
|
SRpcMsg rpcMsg = {0};
|
||||||
|
(void)syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId);
|
||||||
|
|
||||||
|
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||||
pSyncMsg->destId = pData->destId;
|
pSyncMsg->destId = pData->destId;
|
||||||
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
|
@ -1923,28 +1923,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
|
||||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||||
pSyncMsg->privateTerm = 0;
|
pSyncMsg->privateTerm = 0;
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);
|
|
||||||
|
|
||||||
// eq msg
|
|
||||||
#if 0
|
|
||||||
if (pSyncNode->syncEqCtrlMsg != NULL) {
|
|
||||||
int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
|
|
||||||
if (code != 0) {
|
|
||||||
sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
|
||||||
syncHeartbeatDestroy(pSyncMsg);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
|
|
||||||
syncHeartbeatDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
|
taosTmrReset(syncNodeEqPeerHeartbeatTimer, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager,
|
||||||
|
@ -1972,7 +1952,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
|
||||||
if (pEntry == NULL) return -1;
|
if (pEntry == NULL) return -1;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncClientRequestBuildFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
|
int32_t code = syncBuildClientRequestFromNoopEntry(&rpcMsg, pEntry, pNode->vgId);
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
|
|
||||||
sNTrace(pNode, "propose msg, type:noop");
|
sNTrace(pNode, "propose msg, type:noop");
|
||||||
|
@ -2026,7 +2006,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncHeartbeat* pMsg = pRpcMsg->pCont;
|
||||||
syncLogRecvHeartbeat(ths, pMsg, "");
|
syncLogRecvHeartbeat(ths, pMsg, "");
|
||||||
|
|
||||||
SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
|
SyncHeartbeatReply* pMsgReply = syncHeartbeatReplyBuild(ths->vgId);
|
||||||
|
|
|
@ -20,18 +20,18 @@
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
|
||||||
int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
|
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
|
||||||
SSyncNode* pNode) {
|
SSyncNode* pNode) {
|
||||||
int32_t bytes = sizeof(SyncTimeout);
|
int32_t bytes = sizeof(SyncTimeout);
|
||||||
pTimeoutRpcMsg->pCont = rpcMallocCont(bytes);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
pTimeoutRpcMsg->msgType = TDMT_SYNC_TIMEOUT;
|
pMsg->msgType = TDMT_SYNC_TIMEOUT;
|
||||||
pTimeoutRpcMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pTimeoutRpcMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncTimeout* pTimeout = pTimeoutRpcMsg->pCont;
|
SyncTimeout* pTimeout = pMsg->pCont;
|
||||||
pTimeout->bytes = bytes;
|
pTimeout->bytes = bytes;
|
||||||
pTimeout->msgType = TDMT_SYNC_TIMEOUT;
|
pTimeout->msgType = TDMT_SYNC_TIMEOUT;
|
||||||
pTimeout->vgId = pNode->vgId;
|
pTimeout->vgId = pNode->vgId;
|
||||||
|
@ -42,41 +42,40 @@ int32_t syncTimeoutBuild(SRpcMsg* pTimeoutRpcMsg, ESyncTimeoutType timeoutType,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
|
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
|
||||||
bool isWeak, int32_t vgId) {
|
int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
|
||||||
int32_t bytes = sizeof(SyncClientRequest) + pOriginalRpcMsg->contLen;
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
pClientRequestRpcMsg->pCont = rpcMallocCont(bytes);
|
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pMsg->contLen = bytes;
|
||||||
pClientRequestRpcMsg->contLen = bytes;
|
if (pMsg->pCont == NULL) {
|
||||||
if (pClientRequestRpcMsg->pCont == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont;
|
SyncClientRequest* pClientRequest = pMsg->pCont;
|
||||||
pClientRequest->bytes = bytes;
|
pClientRequest->bytes = bytes;
|
||||||
pClientRequest->vgId = vgId;
|
pClientRequest->vgId = vgId;
|
||||||
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pClientRequest->originalRpcType = pOriginalRpcMsg->msgType;
|
pClientRequest->originalRpcType = pOriginal->msgType;
|
||||||
pClientRequest->seqNum = seqNum;
|
pClientRequest->seqNum = seqNum;
|
||||||
pClientRequest->isWeak = isWeak;
|
pClientRequest->isWeak = isWeak;
|
||||||
pClientRequest->dataLen = pOriginalRpcMsg->contLen;
|
pClientRequest->dataLen = pOriginal->contLen;
|
||||||
memcpy(pClientRequest->data, (char*)pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen);
|
memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
|
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
|
||||||
int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
|
int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
|
||||||
pClientRequestRpcMsg->pCont = rpcMallocCont(bytes);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
pClientRequestRpcMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
pClientRequestRpcMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pClientRequestRpcMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncClientRequest* pClientRequest = pClientRequestRpcMsg->pCont;
|
SyncClientRequest* pClientRequest = pMsg->pCont;
|
||||||
pClientRequest->bytes = bytes;
|
pClientRequest->bytes = bytes;
|
||||||
pClientRequest->vgId = vgId;
|
pClientRequest->vgId = vgId;
|
||||||
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
|
@ -93,7 +92,7 @@ int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TDMT_SYNC_REQUEST_VOTE;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,7 +109,7 @@ int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
|
||||||
pMsg->contLen = bytes;
|
pMsg->contLen = bytes;
|
||||||
if (pMsg->pCont == NULL) {
|
if (pMsg->pCont == NULL) {
|
||||||
terrno = TDMT_SYNC_REQUEST_VOTE;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -139,314 +138,40 @@ int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncAppendEntriesReply----
|
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
|
int32_t bytes = sizeof(SyncAppendEntriesReply);
|
||||||
uint32_t bytes = sizeof(SyncAppendEntriesReply);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
memset(pMsg, 0, bytes);
|
|
||||||
pMsg->bytes = bytes;
|
|
||||||
pMsg->vgId = vgId;
|
|
||||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
||||||
return pMsg;
|
pMsg->contLen = bytes;
|
||||||
}
|
if (pMsg->pCont == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) {
|
return -1;
|
||||||
if (pMsg != NULL) {
|
|
||||||
taosMemoryFree(pMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) {
|
|
||||||
ASSERT(pMsg->bytes <= bufLen);
|
|
||||||
memcpy(buf, pMsg, pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) {
|
|
||||||
memcpy(pMsg, buf, len);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) {
|
|
||||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes);
|
|
||||||
if (len != NULL) {
|
|
||||||
*len = pMsg->bytes;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) {
|
|
||||||
uint32_t bytes = *((uint32_t*)buf);
|
|
||||||
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
syncAppendEntriesReplyDeserialize(buf, len, pMsg);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) {
|
|
||||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
|
||||||
pRpcMsg->msgType = pMsg->msgType;
|
|
||||||
pRpcMsg->contLen = pMsg->bytes;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) {
|
|
||||||
syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|
||||||
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
|
||||||
char u64buf[128] = {0};
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
|
||||||
cJSON* pTmp = pSrcId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
|
||||||
cJSON* pTmp = pDestId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
|
||||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex);
|
|
||||||
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
|
|
||||||
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
|
||||||
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot);
|
pAppendEntriesReply->bytes = bytes;
|
||||||
return pJson;
|
pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
||||||
|
pAppendEntriesReply->vgId = vgId;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) {
|
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
|
||||||
cJSON* pJson = syncAppendEntriesReply2Json(pMsg);
|
int32_t bytes = sizeof(SyncHeartbeat);
|
||||||
char* serialized = cJSON_Print(pJson);
|
pMsg->pCont = rpcMallocCont(bytes);
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for debug ----------------------
|
|
||||||
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) {
|
|
||||||
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
|
||||||
printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) {
|
|
||||||
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
|
||||||
printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) {
|
|
||||||
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
|
||||||
sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
|
||||||
sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- message process SyncHeartbeat----
|
|
||||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) {
|
|
||||||
uint32_t bytes = sizeof(SyncHeartbeat);
|
|
||||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
memset(pMsg, 0, bytes);
|
|
||||||
pMsg->bytes = bytes;
|
|
||||||
pMsg->vgId = vgId;
|
|
||||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||||
return pMsg;
|
pMsg->contLen = bytes;
|
||||||
}
|
if (pMsg->pCont == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg) {
|
return -1;
|
||||||
if (pMsg != NULL) {
|
|
||||||
taosMemoryFree(pMsg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) {
|
|
||||||
ASSERT(pMsg->bytes <= bufLen);
|
|
||||||
memcpy(buf, pMsg, pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) {
|
|
||||||
memcpy(pMsg, buf, len);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) {
|
|
||||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
|
||||||
ASSERT(buf != NULL);
|
|
||||||
syncHeartbeatSerialize(pMsg, buf, pMsg->bytes);
|
|
||||||
if (len != NULL) {
|
|
||||||
*len = pMsg->bytes;
|
|
||||||
}
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) {
|
|
||||||
uint32_t bytes = *((uint32_t*)buf);
|
|
||||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
syncHeartbeatDeserialize(buf, len, pMsg);
|
|
||||||
ASSERT(len == pMsg->bytes);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) {
|
|
||||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
|
||||||
pRpcMsg->msgType = pMsg->msgType;
|
|
||||||
pRpcMsg->contLen = pMsg->bytes;
|
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
|
||||||
syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) {
|
|
||||||
syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
|
||||||
SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
|
||||||
ASSERT(pMsg != NULL);
|
|
||||||
return pMsg;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) {
|
|
||||||
char u64buf[128] = {0};
|
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pMsg != NULL) {
|
|
||||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
|
||||||
|
|
||||||
cJSON* pSrcId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
|
||||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->srcId.addr;
|
|
||||||
cJSON* pTmp = pSrcId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
|
||||||
|
|
||||||
cJSON* pDestId = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
|
||||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pMsg->destId.addr;
|
|
||||||
cJSON* pTmp = pDestId;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
|
||||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
|
||||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
|
|
||||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
SyncHeartbeat* pHeartbeat = pMsg->pCont;
|
||||||
cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot);
|
pHeartbeat->bytes = bytes;
|
||||||
return pJson;
|
pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
|
||||||
|
pHeartbeat->vgId = vgId;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) {
|
|
||||||
cJSON* pJson = syncHeartbeat2Json(pMsg);
|
|
||||||
char* serialized = cJSON_Print(pJson);
|
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg) {
|
|
||||||
char* serialized = syncHeartbeat2Str(pMsg);
|
|
||||||
printf("syncHeartbeatPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) {
|
|
||||||
char* serialized = syncHeartbeat2Str(pMsg);
|
|
||||||
printf("syncHeartbeatPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg) {
|
|
||||||
char* serialized = syncHeartbeat2Str(pMsg);
|
|
||||||
sTrace("syncHeartbeatLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) {
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
char* serialized = syncHeartbeat2Str(pMsg);
|
|
||||||
sTrace("syncHeartbeatLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- message process SyncHeartbeatReply----
|
// ---- message process SyncHeartbeatReply----
|
||||||
SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId) {
|
SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId) {
|
||||||
|
|
|
@ -186,19 +186,19 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* dest
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) {
|
int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcMsg* pMsg) {
|
||||||
int32_t ret = 0;
|
syncLogSendHeartbeat(pSyncNode, pMsg->pCont, "");
|
||||||
syncLogSendHeartbeat(pSyncNode, pMsg, "");
|
return syncNodeSendMsgById(destId, pSyncNode, pMsg);
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
|
|
||||||
syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg);
|
|
||||||
return ret;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
SyncHeartbeat* pSyncMsg = syncHeartbeatBuild(pSyncNode->vgId);
|
SRpcMsg rpcMsg = {0};
|
||||||
|
if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncHeartbeat* pSyncMsg = rpcMsg.pCont;
|
||||||
pSyncMsg->srcId = pSyncNode->myRaftId;
|
pSyncMsg->srcId = pSyncNode->myRaftId;
|
||||||
pSyncMsg->destId = pSyncNode->peersId[i];
|
pSyncMsg->destId = pSyncNode->peersId[i];
|
||||||
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||||
|
@ -206,13 +206,8 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||||
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
|
||||||
pSyncMsg->privateTerm = 0;
|
pSyncMsg->privateTerm = 0;
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg);
|
|
||||||
|
|
||||||
// send msg
|
// send msg
|
||||||
syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg);
|
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
|
||||||
|
|
||||||
syncHeartbeatDestroy(pSyncMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -45,7 +45,7 @@ SyncClientRequest *createSyncClientRequest() {
|
||||||
strcpy((char *)rpcMsg.pCont, "hello rpc");
|
strcpy((char *)rpcMsg.pCont, "hello rpc");
|
||||||
|
|
||||||
SRpcMsg clientRequestMsg;
|
SRpcMsg clientRequestMsg;
|
||||||
syncClientRequestBuildFromRpcMsg(&clientRequestMsg, &rpcMsg, 123, true, 1000);
|
syncBuildClientRequest(&clientRequestMsg, &rpcMsg, 123, true, 1000);
|
||||||
SyncClientRequest *pMsg = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen);
|
SyncClientRequest *pMsg = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen);
|
||||||
memcpy(pMsg->data, clientRequestMsg.pCont, clientRequestMsg.contLen);
|
memcpy(pMsg->data, clientRequestMsg.pCont, clientRequestMsg.contLen);
|
||||||
return pMsg;
|
return pMsg;
|
||||||
|
|
|
@ -154,7 +154,7 @@ SRpcMsg *step0() {
|
||||||
|
|
||||||
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
SyncClientRequest *step1(const SRpcMsg *pMsg) {
|
||||||
SRpcMsg clientRequestMsg;
|
SRpcMsg clientRequestMsg;
|
||||||
syncClientRequestBuildFromRpcMsg(&clientRequestMsg, pMsg, 123, true, 1000);
|
syncBuildClientRequest(&clientRequestMsg, pMsg, 123, true, 1000);
|
||||||
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen);
|
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(clientRequestMsg.contLen);
|
||||||
memcpy(pMsg2->data, clientRequestMsg.pCont, clientRequestMsg.contLen);
|
memcpy(pMsg2->data, clientRequestMsg.pCont, clientRequestMsg.contLen);
|
||||||
return pMsg2;
|
return pMsg2;
|
||||||
|
|
|
@ -295,6 +295,42 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
|
||||||
void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
|
void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
|
||||||
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
|
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
|
||||||
|
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
|
||||||
|
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
|
||||||
|
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
|
||||||
|
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
|
||||||
|
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
|
||||||
|
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
|
||||||
|
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
|
||||||
|
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
|
||||||
|
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
|
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
|
||||||
|
void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
|
||||||
|
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
|
||||||
|
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
|
||||||
|
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
|
||||||
|
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
|
||||||
|
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
|
||||||
|
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
|
||||||
|
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
|
||||||
|
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
|
||||||
|
void syncHeartbeatLog(const SyncHeartbeat* pMsg);
|
||||||
|
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1479,3 +1479,312 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- message process SyncAppendEntriesReply----
|
||||||
|
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncAppendEntriesReply);
|
||||||
|
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
ASSERT(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
syncAppendEntriesReplyDeserialize(buf, len, pMsg);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncAppendEntriesReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg) {
|
||||||
|
syncAppendEntriesReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
|
||||||
|
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncAppendEntriesReply", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg) {
|
||||||
|
cJSON* pJson = syncAppendEntriesReply2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg) {
|
||||||
|
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
||||||
|
printf("syncAppendEntriesReplyPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg) {
|
||||||
|
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
||||||
|
printf("syncAppendEntriesReplyPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) {
|
||||||
|
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
||||||
|
sTrace("syncAppendEntriesReplyLog | len:%d| %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
||||||
|
sTrace("syncAppendEntriesReplyLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- message process SyncHeartbeat----
|
||||||
|
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) {
|
||||||
|
uint32_t bytes = sizeof(SyncHeartbeat);
|
||||||
|
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
memset(pMsg, 0, bytes);
|
||||||
|
pMsg->bytes = bytes;
|
||||||
|
pMsg->vgId = vgId;
|
||||||
|
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatDestroy(SyncHeartbeat* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) {
|
||||||
|
ASSERT(pMsg->bytes <= bufLen);
|
||||||
|
memcpy(buf, pMsg, pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) {
|
||||||
|
memcpy(pMsg, buf, len);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) {
|
||||||
|
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
syncHeartbeatSerialize(pMsg, buf, pMsg->bytes);
|
||||||
|
if (len != NULL) {
|
||||||
|
*len = pMsg->bytes;
|
||||||
|
}
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) {
|
||||||
|
uint32_t bytes = *((uint32_t*)buf);
|
||||||
|
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
syncHeartbeatDeserialize(buf, len, pMsg);
|
||||||
|
ASSERT(len == pMsg->bytes);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) {
|
||||||
|
syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
|
||||||
|
cJSON* pSrcId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||||
|
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->srcId.addr;
|
||||||
|
cJSON* pTmp = pSrcId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||||
|
|
||||||
|
cJSON* pDestId = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||||
|
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pMsg->destId.addr;
|
||||||
|
cJSON* pTmp = pDestId;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||||
|
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) {
|
||||||
|
cJSON* pJson = syncHeartbeat2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatPrint(const SyncHeartbeat* pMsg) {
|
||||||
|
char* serialized = syncHeartbeat2Str(pMsg);
|
||||||
|
printf("syncHeartbeatPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) {
|
||||||
|
char* serialized = syncHeartbeat2Str(pMsg);
|
||||||
|
printf("syncHeartbeatPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatLog(const SyncHeartbeat* pMsg) {
|
||||||
|
char* serialized = syncHeartbeat2Str(pMsg);
|
||||||
|
sTrace("syncHeartbeatLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncHeartbeat2Str(pMsg);
|
||||||
|
sTrace("syncHeartbeatLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,7 +13,7 @@
|
||||||
,,,script,./test.sh -f tsim/db/alter_option.sim
|
,,,script,./test.sh -f tsim/db/alter_option.sim
|
||||||
,,,script,./test.sh -f tsim/db/alter_replica_13.sim
|
,,,script,./test.sh -f tsim/db/alter_replica_13.sim
|
||||||
,,,script,./test.sh -f tsim/db/alter_replica_31.sim
|
,,,script,./test.sh -f tsim/db/alter_replica_31.sim
|
||||||
,,,script,./test.sh -f tsim/db/basic1.sim
|
,,y,script,./test.sh -f tsim/db/basic1.sim
|
||||||
,,,script,./test.sh -f tsim/db/basic2.sim
|
,,,script,./test.sh -f tsim/db/basic2.sim
|
||||||
,,,script,./test.sh -f tsim/db/basic3.sim
|
,,,script,./test.sh -f tsim/db/basic3.sim
|
||||||
,,,script,./test.sh -f tsim/db/basic4.sim
|
,,,script,./test.sh -f tsim/db/basic4.sim
|
||||||
|
|
Loading…
Reference in New Issue