fix: initialize vnode state applyTerm with commitTerm in vnodeOpen
This commit is contained in:
parent
decb17fcb1
commit
3cbe109e4b
|
@ -209,8 +209,8 @@ int vnodeCommit(SVnode *pVnode) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
char dir[TSDB_FILENAME_LEN];
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
|
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
|
||||||
pVnode->state.applied);
|
pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm);
|
||||||
|
|
||||||
// persist wal before starting
|
// persist wal before starting
|
||||||
if (walPersist(pVnode->pWal) < 0) {
|
if (walPersist(pVnode->pWal) < 0) {
|
||||||
|
|
|
@ -144,9 +144,9 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
pVnode->config = info.config;
|
pVnode->config = info.config;
|
||||||
pVnode->state.committed = info.state.committed;
|
pVnode->state.committed = info.state.committed;
|
||||||
pVnode->state.commitTerm = info.state.commitTerm;
|
pVnode->state.commitTerm = info.state.commitTerm;
|
||||||
pVnode->state.applied = info.state.committed;
|
|
||||||
pVnode->state.commitID = info.state.commitID;
|
pVnode->state.commitID = info.state.commitID;
|
||||||
pVnode->state.commitTerm = info.state.commitTerm;
|
pVnode->state.applied = info.state.committed;
|
||||||
|
pVnode->state.applyTerm = info.state.commitTerm;
|
||||||
pVnode->pTfs = pTfs;
|
pVnode->pTfs = pTfs;
|
||||||
pVnode->msgCb = msgCb;
|
pVnode->msgCb = msgCb;
|
||||||
taosThreadMutexInit(&pVnode->lock, NULL);
|
taosThreadMutexInit(&pVnode->lock, NULL);
|
||||||
|
@ -269,10 +269,7 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// start the sync timer after the queue is ready
|
// start the sync timer after the queue is ready
|
||||||
int32_t vnodeStart(SVnode *pVnode) {
|
int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); }
|
||||||
vnodeSyncStart(pVnode);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeStop(SVnode *pVnode) {}
|
void vnodeStop(SVnode *pVnode) {}
|
||||||
|
|
||||||
|
|
|
@ -187,6 +187,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
|
|
||||||
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
vDebug("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
|
||||||
version);
|
version);
|
||||||
|
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
|
||||||
|
|
||||||
pVnode->state.applied = version;
|
pVnode->state.applied = version;
|
||||||
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
|
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
|
||||||
|
|
|
@ -224,6 +224,9 @@ int32_t syncLogBufferInitWithoutLock(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
// update startIndex
|
// update startIndex
|
||||||
pBuf->startIndex = takeDummy ? index : index + 1;
|
pBuf->startIndex = takeDummy ? index : index + 1;
|
||||||
|
|
||||||
|
sInfo("vgId:%d, init sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
|
||||||
|
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
|
||||||
// validate
|
// validate
|
||||||
syncLogBufferValidate(pBuf);
|
syncLogBufferValidate(pBuf);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -826,6 +829,10 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
|
||||||
|
|
||||||
pMgr->endIndex = index + 1;
|
pMgr->endIndex = index + 1;
|
||||||
if (barrier) {
|
if (barrier) {
|
||||||
|
sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64
|
||||||
|
", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")",
|
||||||
|
pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
|
||||||
|
pMgr->endIndex);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
||||||
sNInfo(ths, "timer routines");
|
sNDebug(ths, "timer routines");
|
||||||
|
|
||||||
// timer replicate
|
// timer replicate
|
||||||
syncNodeReplicate(ths);
|
syncNodeReplicate(ths);
|
||||||
|
|
Loading…
Reference in New Issue