Merge pull request #16061 from taosdata/fix/mnode
fix: deadlock of mnode if its state changed
This commit is contained in:
commit
d50f36ba97
|
@ -742,7 +742,9 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
pMgmt->errCode = 0;
|
pMgmt->errCode = 0;
|
||||||
|
taosWLockLatch(&pMgmt->lock);
|
||||||
pMgmt->transId = -1;
|
pMgmt->transId = -1;
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
tsem_wait(&pMgmt->syncSem);
|
tsem_wait(&pMgmt->syncSem);
|
||||||
mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
|
mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
|
||||||
terrno = pMgmt->errCode;
|
terrno = pMgmt->errCode;
|
||||||
|
|
|
@ -60,22 +60,24 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
|
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->lock);
|
taosWLockLatch(&pMgmt->lock);
|
||||||
if (transId <= 0) {
|
if (transId <= 0) {
|
||||||
taosRUnLockLatch(&pMgmt->lock);
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
mError("trans:%d, invalid commit msg", transId);
|
mError("trans:%d, invalid commit msg", transId);
|
||||||
} else if (transId == pMgmt->transId) {
|
} else if (transId == pMgmt->transId) {
|
||||||
taosRUnLockLatch(&pMgmt->lock);
|
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
|
||||||
|
} else {
|
||||||
|
mInfo("trans:%d, is proposed and post sem", transId, tstrerror(pMgmt->errCode));
|
||||||
}
|
}
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
} else {
|
} else {
|
||||||
taosRUnLockLatch(&pMgmt->lock);
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
if (pTrans != NULL) {
|
if (pTrans != NULL) {
|
||||||
mDebug("trans:%d, execute in mnode which not leader", transId);
|
mInfo("trans:%d, execute in mnode which not leader", transId);
|
||||||
mndTransExecute(pMnode, pTrans);
|
mndTransExecute(pMnode, pTrans);
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
|
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
|
||||||
|
@ -122,7 +124,10 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
||||||
taosWLockLatch(&pMgmt->lock);
|
taosWLockLatch(&pMgmt->lock);
|
||||||
if (pMgmt->transId == -1) {
|
if (pMgmt->transId == -1) {
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
|
mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode));
|
||||||
|
} else {
|
||||||
|
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
|
||||||
|
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
|
||||||
}
|
}
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
|
@ -174,7 +179,7 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
||||||
|
|
||||||
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
mDebug("vgId:1, become follower");
|
mDebug("vgId:1, become follower and post sem");
|
||||||
|
|
||||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||||
if (pMnode->syncMgmt.transId != 0) {
|
if (pMnode->syncMgmt.transId != 0) {
|
||||||
|
@ -187,13 +192,6 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
||||||
mDebug("vgId:1, become leader");
|
mDebug("vgId:1, become leader");
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
|
||||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
|
||||||
if (pMnode->syncMgmt.transId != 0) {
|
|
||||||
pMnode->syncMgmt.transId = 0;
|
|
||||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
|
||||||
}
|
|
||||||
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
|
@ -279,9 +277,16 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
|
|
||||||
pMgmt->errCode = 0;
|
pMgmt->errCode = 0;
|
||||||
taosWLockLatch(&pMgmt->lock);
|
taosWLockLatch(&pMgmt->lock);
|
||||||
pMgmt->transId = transId;
|
if (pMgmt->transId != 0) {
|
||||||
|
mError("trans:%d, can't be proposed since trans:%s alrady waiting for confirm", transId, pMgmt->transId);
|
||||||
taosWUnLockLatch(&pMgmt->lock);
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
mTrace("trans:%d, will be proposed", pMgmt->transId);
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
pMgmt->transId = transId;
|
||||||
|
mDebug("trans:%d, will be proposed", pMgmt->transId);
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
|
}
|
||||||
|
|
||||||
const bool isWeak = false;
|
const bool isWeak = false;
|
||||||
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
||||||
|
|
|
@ -688,6 +688,7 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
taosThreadMutexLock(&pVnode->lock);
|
taosThreadMutexLock(&pVnode->lock);
|
||||||
if (pVnode->blocked) {
|
if (pVnode->blocked) {
|
||||||
pVnode->blocked = false;
|
pVnode->blocked = false;
|
||||||
|
vDebug("vgId:%d, become follower and post block", pVnode->config.vgId);
|
||||||
tsem_post(&pVnode->syncSem);
|
tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pVnode->lock);
|
taosThreadMutexUnlock(&pVnode->lock);
|
||||||
|
@ -696,6 +697,13 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||||
|
|
||||||
|
// taosThreadMutexLock(&pVnode->lock);
|
||||||
|
// if (pVnode->blocked) {
|
||||||
|
// pVnode->blocked = false;
|
||||||
|
// tsem_post(&pVnode->syncSem);
|
||||||
|
// }
|
||||||
|
// taosThreadMutexUnlock(&pVnode->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
|
|
Loading…
Reference in New Issue