Merge pull request #17856 from taosdata/feature/3.0_mhli
enh(sync): add sync pre stop, local-cmd:follower-commit
This commit is contained in:
commit
6cef3fa4fa
|
@ -211,6 +211,7 @@ void syncCleanUp();
|
||||||
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
||||||
void syncStart(int64_t rid);
|
void syncStart(int64_t rid);
|
||||||
void syncStop(int64_t rid);
|
void syncStop(int64_t rid);
|
||||||
|
void syncPreStop(int64_t rid);
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||||
|
|
|
@ -429,6 +429,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
void mndPreClose(SMnode *pMnode) {
|
void mndPreClose(SMnode *pMnode) {
|
||||||
if (pMnode != NULL) {
|
if (pMnode != NULL) {
|
||||||
syncLeaderTransfer(pMnode->syncMgmt.sync);
|
syncLeaderTransfer(pMnode->syncMgmt.sync);
|
||||||
|
syncPreStop(pMnode->syncMgmt.sync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -245,6 +245,7 @@ _err:
|
||||||
void vnodePreClose(SVnode *pVnode) {
|
void vnodePreClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
syncLeaderTransfer(pVnode->sync);
|
syncLeaderTransfer(pVnode->sync);
|
||||||
|
syncPreStop(pVnode->sync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -218,6 +218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
|
||||||
void syncNodeStart(SSyncNode* pSyncNode);
|
void syncNodeStart(SSyncNode* pSyncNode);
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||||
void syncNodeClose(SSyncNode* pSyncNode);
|
void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
|
void syncNodePreClose(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
||||||
|
|
||||||
// option
|
// option
|
||||||
|
|
|
@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SYNC_LOCAL_CMD_STEP_DOWN = 100,
|
SYNC_LOCAL_CMD_STEP_DOWN = 100,
|
||||||
|
SYNC_LOCAL_CMD_FOLLOWER_CMT,
|
||||||
} ESyncLocalCmd;
|
} ESyncLocalCmd;
|
||||||
|
|
||||||
const char* syncLocalCmdGetStr(int32_t cmd);
|
const char* syncLocalCmdGetStr(int32_t cmd);
|
||||||
|
@ -742,6 +743,7 @@ typedef struct SyncLocalCmd {
|
||||||
|
|
||||||
int32_t cmd;
|
int32_t cmd;
|
||||||
SyncTerm sdNewTerm; // step down new term
|
SyncTerm sdNewTerm; // step down new term
|
||||||
|
SyncIndex fcIndex;// follower commit index
|
||||||
|
|
||||||
} SyncLocalCmd;
|
} SyncLocalCmd;
|
||||||
|
|
||||||
|
|
|
@ -90,6 +90,11 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
||||||
|
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
syncNodeEventLog(ths, "can not do follower commit");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// maybe update commit index, leader notice me
|
// maybe update commit index, leader notice me
|
||||||
if (newCommitIndex > ths->commitIndex) {
|
if (newCommitIndex > ths->commitIndex) {
|
||||||
// has commit entry in local
|
// has commit entry in local
|
||||||
|
|
|
@ -81,6 +81,15 @@ void syncStop(int64_t rid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncPreStop(int64_t rid) {
|
||||||
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
|
if (pSyncNode == NULL) return;
|
||||||
|
|
||||||
|
syncNodePreClose(pSyncNode);
|
||||||
|
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
|
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
|
||||||
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
|
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
|
||||||
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
|
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
|
||||||
|
@ -435,8 +444,12 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
|
int32_t ret = 0;
|
||||||
int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
|
||||||
|
ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1222,6 +1235,14 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||||
|
// stop elect timer
|
||||||
|
syncNodeStopElectTimer(pSyncNode);
|
||||||
|
|
||||||
|
// stop heartbeat timer
|
||||||
|
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -2825,11 +2846,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
|
||||||
syncNodeResetElectTimer(ths);
|
syncNodeResetElectTimer(ths);
|
||||||
ths->minMatchIndex = pMsg->minMatchIndex;
|
ths->minMatchIndex = pMsg->minMatchIndex;
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
|
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
syncNodeFollowerCommit(ths, pMsg->commitIndex);
|
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
|
||||||
|
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
|
||||||
|
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
|
||||||
|
pSyncMsg->fcIndex = pMsg->commitIndex;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsgLocalCmd;
|
||||||
|
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
|
||||||
|
|
||||||
|
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
|
||||||
|
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
|
||||||
|
if (code != 0) {
|
||||||
|
sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
|
||||||
|
rpcFreeCont(rpcMsgLocalCmd.pCont);
|
||||||
|
} else {
|
||||||
|
sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
|
||||||
|
@ -2883,6 +2918,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
|
||||||
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
||||||
syncNodeStepDown(ths, pMsg->sdNewTerm);
|
syncNodeStepDown(ths, pMsg->sdNewTerm);
|
||||||
|
|
||||||
|
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
|
||||||
|
syncNodeFollowerCommit(ths, pMsg->fcIndex);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
syncNodeErrorLog(ths, "error local cmd");
|
syncNodeErrorLog(ths, "error local cmd");
|
||||||
}
|
}
|
||||||
|
|
|
@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
|
||||||
const char* syncLocalCmdGetStr(int32_t cmd) {
|
const char* syncLocalCmdGetStr(int32_t cmd) {
|
||||||
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
|
||||||
return "step-down";
|
return "step-down";
|
||||||
|
} else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
|
||||||
|
return "follower-commit";
|
||||||
}
|
}
|
||||||
|
|
||||||
return "unknown-local-cmd";
|
return "unknown-local-cmd";
|
||||||
|
@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
|
||||||
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
|
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
|
|
@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() {
|
||||||
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||||
pMsg->destId.vgId = 100;
|
pMsg->destId.vgId = 100;
|
||||||
pMsg->sdNewTerm = 123;
|
pMsg->sdNewTerm = 123;
|
||||||
|
pMsg->fcIndex = 456;
|
||||||
pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
|
pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
|
||||||
|
|
||||||
return pMsg;
|
return pMsg;
|
||||||
|
|
Loading…
Reference in New Issue