Merge pull request #16044 from taosdata/fix/mnode
fix: deadlock of mnode if its state changed
This commit is contained in:
commit
5b300b5819
|
@ -87,10 +87,11 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
tsem_t syncSem;
|
tsem_t syncSem;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
bool standby;
|
|
||||||
SReplica replica;
|
SReplica replica;
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
int32_t transId;
|
int32_t transId;
|
||||||
|
SRWLatch lock;
|
||||||
|
int8_t standby;
|
||||||
int8_t leaderTransferFinish;
|
int8_t leaderTransferFinish;
|
||||||
} SSyncMgmt;
|
} SSyncMgmt;
|
||||||
|
|
||||||
|
|
|
@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
|
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosRLockLatch(&pMgmt->lock);
|
||||||
if (transId <= 0) {
|
if (transId <= 0) {
|
||||||
|
taosRUnLockLatch(&pMgmt->lock);
|
||||||
mError("trans:%d, invalid commit msg", transId);
|
mError("trans:%d, invalid commit msg", transId);
|
||||||
} else if (transId == pMgmt->transId) {
|
} else if (transId == pMgmt->transId) {
|
||||||
|
taosRUnLockLatch(&pMgmt->lock);
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
||||||
}
|
}
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
} else {
|
} else {
|
||||||
|
taosRUnLockLatch(&pMgmt->lock);
|
||||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
if (pTrans != NULL) {
|
if (pTrans != NULL) {
|
||||||
mDebug("trans:%d, execute in mnode which not leader", transId);
|
mDebug("trans:%d, execute in mnode which not leader", transId);
|
||||||
|
@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
||||||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
||||||
cbMeta.code, cbMeta.index, cbMeta.term);
|
cbMeta.code, cbMeta.index, cbMeta.term);
|
||||||
|
|
||||||
|
taosWLockLatch(&pMgmt->lock);
|
||||||
if (pMgmt->transId == -1) {
|
if (pMgmt->transId == -1) {
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
|
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
|
||||||
|
@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
|
@ -170,12 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
mDebug("vgId:1, become follower");
|
mDebug("vgId:1, become follower");
|
||||||
|
|
||||||
// clear old leader resource
|
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||||
|
if (pMnode->syncMgmt.transId != 0) {
|
||||||
|
pMnode->syncMgmt.transId = 0;
|
||||||
|
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
static void mndBecomeLeader(struct SSyncFSM *pFsm) {
|
||||||
SMnode *pMnode = pFsm->data;
|
|
||||||
mDebug("vgId:1, become leader");
|
mDebug("vgId:1, become leader");
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
|
||||||
|
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||||
|
if (pMnode->syncMgmt.transId != 0) {
|
||||||
|
pMnode->syncMgmt.transId = 0;
|
||||||
|
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
|
@ -202,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t mndInitSync(SMnode *pMnode) {
|
int32_t mndInitSync(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
taosInitRWLatch(&pMgmt->lock);
|
||||||
|
pMgmt->transId = 0;
|
||||||
|
|
||||||
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||||
|
@ -254,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
memcpy(req.pCont, pRaw, req.contLen);
|
memcpy(req.pCont, pRaw, req.contLen);
|
||||||
|
|
||||||
pMgmt->errCode = 0;
|
pMgmt->errCode = 0;
|
||||||
|
taosWLockLatch(&pMgmt->lock);
|
||||||
pMgmt->transId = transId;
|
pMgmt->transId = transId;
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
mTrace("trans:%d, will be proposed", pMgmt->transId);
|
mTrace("trans:%d, will be proposed", pMgmt->transId);
|
||||||
|
|
||||||
const bool isWeak = false;
|
const bool isWeak = false;
|
||||||
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
tsem_wait(&pMgmt->syncSem);
|
tsem_wait(&pMgmt->syncSem);
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||||
|
@ -286,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncStop(SMnode *pMnode) {
|
void mndSyncStop(SMnode *pMnode) {
|
||||||
|
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||||
if (pMnode->syncMgmt.transId != 0) {
|
if (pMnode->syncMgmt.transId != 0) {
|
||||||
pMnode->syncMgmt.transId = 0;
|
pMnode->syncMgmt.transId = 0;
|
||||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndIsMaster(SMnode *pMnode) {
|
bool mndIsMaster(SMnode *pMnode) {
|
||||||
|
|
|
@ -308,7 +308,8 @@ struct SVnode {
|
||||||
SSink* pSink;
|
SSink* pSink;
|
||||||
tsem_t canCommit;
|
tsem_t canCommit;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
int32_t blockCount;
|
SRWLatch lock;
|
||||||
|
bool blocked;
|
||||||
bool restored;
|
bool restored;
|
||||||
tsem_t syncSem;
|
tsem_t syncSem;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
|
|
|
@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
pVnode->state.commitTerm = info.state.commitTerm;
|
pVnode->state.commitTerm = info.state.commitTerm;
|
||||||
pVnode->pTfs = pTfs;
|
pVnode->pTfs = pTfs;
|
||||||
pVnode->msgCb = msgCb;
|
pVnode->msgCb = msgCb;
|
||||||
pVnode->blockCount = 0;
|
taosInitRWLatch(&pVnode->lock);
|
||||||
|
pVnode->blocked = false;
|
||||||
|
|
||||||
tsem_init(&pVnode->syncSem, 0, 0);
|
tsem_init(&pVnode->syncSem, 0, 0);
|
||||||
tsem_init(&(pVnode->canCommit), 0, 1);
|
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||||
|
|
|
@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
|
||||||
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
taosWLockLatch(&pVnode->lock);
|
||||||
|
if (!pVnode->blocked) {
|
||||||
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
pVnode->blockCount = 1;
|
pVnode->blocked = true;
|
||||||
|
taosWUnLockLatch(&pVnode->lock);
|
||||||
tsem_wait(&pVnode->syncSem);
|
tsem_wait(&pVnode->syncSem);
|
||||||
|
} else {
|
||||||
|
taosWUnLockLatch(&pVnode->lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
if (pVnode->blockCount) {
|
taosWLockLatch(&pVnode->lock);
|
||||||
|
if (pVnode->blocked) {
|
||||||
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
pVnode->blockCount = 0;
|
pVnode->blocked = false;
|
||||||
tsem_post(&pVnode->syncSem);
|
tsem_post(&pVnode->syncSem);
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pVnode->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
vDebug("vgId:%d, become follower", pVnode->config.vgId);
|
vDebug("vgId:%d, become follower", pVnode->config.vgId);
|
||||||
|
|
||||||
// clear old leader resource
|
// clear old leader resource
|
||||||
|
taosWLockLatch(&pVnode->lock);
|
||||||
|
if (pVnode->blocked) {
|
||||||
|
pVnode->blocked = false;
|
||||||
|
tsem_post(&pVnode->syncSem);
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pVnode->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -210,6 +210,8 @@ if $rows != 3 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql_error select * from performance_schema.PERF_OFFSETS;
|
||||||
|
|
||||||
sql show create stable stb;
|
sql show create stable stb;
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
return -1
|
return -1
|
||||||
|
|
Loading…
Reference in New Issue