Merge pull request #17778 from taosdata/feature/3.0_mhli
refactor(sync): add sync local cmd, step down
This commit is contained in:
commit
f32d347bb8
|
@ -232,6 +232,8 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader);
|
||||||
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
|
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
|
||||||
int32_t syncEndSnapshot(int64_t rid);
|
int32_t syncEndSnapshot(int64_t rid);
|
||||||
|
|
||||||
|
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -679,10 +679,13 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
|
||||||
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
|
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SYNC_LOCAL_CMD_STEP_DOWN = 100,
|
SYNC_LOCAL_CMD_STEP_DOWN = 100,
|
||||||
} ESyncLocalCmd;
|
} ESyncLocalCmd;
|
||||||
|
|
||||||
|
const char* syncLocalCmdGetStr(int32_t cmd);
|
||||||
|
|
||||||
typedef struct SyncLocalCmd {
|
typedef struct SyncLocalCmd {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
|
|
@ -586,6 +586,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||||
|
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
||||||
|
syncLocalCmdDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
|
@ -350,6 +350,11 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
|
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
|
||||||
syncSnapshotRspDestroy(pSyncMsg);
|
syncSnapshotRspDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pMsg->msgType == TDMT_SYNC_LOCAL_CMD) {
|
||||||
|
SyncLocalCmd *pSyncMsg = syncLocalCmdFromRpcMsg2(pMsg);
|
||||||
|
code = syncNodeOnLocalCmd(pSyncNode, pSyncMsg);
|
||||||
|
syncLocalCmdDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
|
||||||
code = -1;
|
code = -1;
|
||||||
|
|
|
@ -341,6 +341,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
|
||||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||||
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
|
||||||
|
|
||||||
|
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s);
|
||||||
|
|
||||||
// for debug --------------
|
// for debug --------------
|
||||||
void syncNodePrint(SSyncNode* pObj);
|
void syncNodePrint(SSyncNode* pObj);
|
||||||
void syncNodePrint2(char* s, SSyncNode* pObj);
|
void syncNodePrint2(char* s, SSyncNode* pObj);
|
||||||
|
|
|
@ -528,6 +528,20 @@ int32_t syncEndSnapshot(int64_t rid) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
ASSERT(rid == pSyncNode->rid);
|
||||||
|
|
||||||
|
syncNodeStepDown(pSyncNode, newTerm);
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
|
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode->peersNum == 0) {
|
if (pSyncNode->peersNum == 0) {
|
||||||
sDebug("only one replica, cannot leader transfer");
|
sDebug("only one replica, cannot leader transfer");
|
||||||
|
@ -3041,12 +3055,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
|
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
|
||||||
|
|
||||||
#if 1
|
|
||||||
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
|
||||||
syncNodeStepDown(ths, pMsg->term);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeResetElectTimer(ths);
|
syncNodeResetElectTimer(ths);
|
||||||
ths->minMatchIndex = pMsg->minMatchIndex;
|
ths->minMatchIndex = pMsg->minMatchIndex;
|
||||||
|
@ -3058,6 +3066,28 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
// syncNodeStepDown(ths, pMsg->term);
|
||||||
|
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
|
||||||
|
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
|
||||||
|
pSyncMsg->sdNewTerm = pMsg->term;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsgLocalCmd;
|
||||||
|
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
|
||||||
|
|
||||||
|
if (ths->FpEqMsg != NULL && ths->msgcb != NULL) {
|
||||||
|
int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd);
|
||||||
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
|
||||||
|
rpcFreeCont(rpcMsgLocalCmd.pCont);
|
||||||
|
} else {
|
||||||
|
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRIu64, ths->vgId, pSyncMsg->sdNewTerm);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
syncLocalCmdDestroy(pSyncMsg);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// htonl
|
// htonl
|
||||||
SMsgHead* pHead = rpcMsg.pCont;
|
SMsgHead* pHead = rpcMsg.pCont;
|
||||||
|
@ -3081,6 +3111,19 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
|
||||||
|
syncLogRecvLocalCmd(ths, pMsg, "");
|
||||||
|
|
||||||
|
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
||||||
|
syncNodeStepDown(ths, pMsg->sdNewTerm);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
syncNodeErrorLog(ths, "error local cmd");
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// ClientRequest(i, v) ==
|
// ClientRequest(i, v) ==
|
||||||
// /\ state[i] = Leader
|
// /\ state[i] = Leader
|
||||||
|
@ -3379,7 +3422,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
||||||
// ASSERT(code == 0);
|
// ASSERT(code == 0);
|
||||||
// ASSERT(pEntry != NULL);
|
// ASSERT(pEntry != NULL);
|
||||||
if (code != 0 || pEntry == NULL) {
|
if (code != 0 || pEntry == NULL) {
|
||||||
syncNodeErrorLog(ths, "get log entry error");
|
syncNodeErrorLog(ths, "get log entry error");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3743,3 +3786,10 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
||||||
host, port, pMsg->term, pMsg->privateTerm, s);
|
host, port, pMsg->term, pMsg->privateTerm, s);
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
|
||||||
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRIu64 "}, %s", pMsg->cmd,
|
||||||
|
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, s);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
}
|
|
@ -3097,6 +3097,14 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
const char* syncLocalCmdGetStr(int32_t cmd) {
|
||||||
|
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
||||||
|
return "step-down";
|
||||||
|
}
|
||||||
|
|
||||||
|
return "unknown-local-cmd";
|
||||||
|
}
|
||||||
|
|
||||||
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
|
SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) {
|
||||||
uint32_t bytes = sizeof(SyncLocalCmd);
|
uint32_t bytes = sizeof(SyncLocalCmd);
|
||||||
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
SyncLocalCmd* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
|
Loading…
Reference in New Issue