fix: initialize and update pMnode->applied properly
This commit is contained in:
parent
96e70c2795
commit
66631229eb
|
@ -380,11 +380,13 @@ static int32_t mndInitSdb(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndOpenSdb(SMnode *pMnode) {
|
static int32_t mndOpenSdb(SMnode *pMnode) {
|
||||||
|
int32_t code = 0;
|
||||||
if (!pMnode->deploy) {
|
if (!pMnode->deploy) {
|
||||||
return sdbReadFile(pMnode->pSdb);
|
code = sdbReadFile(pMnode->pSdb);
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
atomic_store_64(&pMnode->applied, pMnode->pSdb->commitIndex);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCleanupSdb(SMnode *pMnode) {
|
static void mndCleanupSdb(SMnode *pMnode) {
|
||||||
|
|
|
@ -130,6 +130,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
|
||||||
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
pMsg->info.conn.applyIndex = pMeta->index;
|
||||||
|
pMsg->info.conn.applyTerm = pMeta->term;
|
||||||
|
|
||||||
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
|
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
|
||||||
|
|
||||||
if (!syncUtilUserCommit(pMsg->msgType)) {
|
if (!syncUtilUserCommit(pMsg->msgType)) {
|
||||||
|
@ -176,6 +179,8 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:1, sync restore finished");
|
mInfo("vgId:1, sync restore finished");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
|
@ -331,10 +336,6 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
pMnode->pSdb->sync = pMgmt->sync;
|
pMnode->pSdb->sync = pMgmt->sync;
|
||||||
|
|
||||||
SSnapshot snap = {0};
|
|
||||||
sdbGetCommitInfo(pMnode->pSdb, &snap.lastApplyIndex, &snap.lastApplyTerm, &snap.lastConfigIndex);
|
|
||||||
atomic_store_64(&pMnode->applied, snap.lastApplyIndex);
|
|
||||||
|
|
||||||
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
|
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue