enh(sync): add sync pre stop
This commit is contained in:
parent
794bca7974
commit
1a4b7622e4
|
@ -211,6 +211,7 @@ void syncCleanUp();
|
||||||
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
||||||
void syncStart(int64_t rid);
|
void syncStart(int64_t rid);
|
||||||
void syncStop(int64_t rid);
|
void syncStop(int64_t rid);
|
||||||
|
void syncPreStop(int64_t rid);
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||||
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
|
||||||
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
|
||||||
|
|
|
@ -429,6 +429,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
void mndPreClose(SMnode *pMnode) {
|
void mndPreClose(SMnode *pMnode) {
|
||||||
if (pMnode != NULL) {
|
if (pMnode != NULL) {
|
||||||
syncLeaderTransfer(pMnode->syncMgmt.sync);
|
syncLeaderTransfer(pMnode->syncMgmt.sync);
|
||||||
|
syncPreStop(pMnode->syncMgmt.sync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -245,6 +245,7 @@ _err:
|
||||||
void vnodePreClose(SVnode *pVnode) {
|
void vnodePreClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
syncLeaderTransfer(pVnode->sync);
|
syncLeaderTransfer(pVnode->sync);
|
||||||
|
syncPreStop(pVnode->sync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -218,6 +218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
|
||||||
void syncNodeStart(SSyncNode* pSyncNode);
|
void syncNodeStart(SSyncNode* pSyncNode);
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||||
void syncNodeClose(SSyncNode* pSyncNode);
|
void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
|
void syncNodePreClose(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
||||||
|
|
||||||
// option
|
// option
|
||||||
|
|
|
@ -81,6 +81,15 @@ void syncStop(int64_t rid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncPreStop(int64_t rid) {
|
||||||
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
|
if (pSyncNode == NULL) return;
|
||||||
|
|
||||||
|
syncNodePreClose(pSyncNode);
|
||||||
|
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
|
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
|
||||||
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
|
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
|
||||||
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
|
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
|
||||||
|
@ -1222,6 +1231,14 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||||
|
// stop elect timer
|
||||||
|
syncNodeStopElectTimer(pSyncNode);
|
||||||
|
|
||||||
|
// stop heartbeat timer
|
||||||
|
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue