fix/TD-30989
This commit is contained in:
parent
fc4d758a9f
commit
43524f394d
|
@ -212,8 +212,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
|
||||||
mndTransRefresh(pMnode, pTrans);
|
mndTransRefresh(pMnode, pTrans);
|
||||||
|
|
||||||
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
|
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
|
||||||
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
|
code = sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
|
||||||
code = 0;
|
|
||||||
|
|
||||||
_OUT:
|
_OUT:
|
||||||
if (pTrans) mndReleaseTrans(pMnode, pTrans);
|
if (pTrans) mndReleaseTrans(pMnode, pTrans);
|
||||||
|
@ -222,7 +221,7 @@ _OUT:
|
||||||
|
|
||||||
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
if (pMgmt->transId == 0) {
|
if (pMgmt->transId == 0) {
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
@ -232,7 +231,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
pMgmt->errCode = code;
|
pMgmt->errCode = code;
|
||||||
tsem_post(&pMgmt->syncSem);
|
(void)tsem_post(&pMgmt->syncSem);
|
||||||
|
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
|
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
|
||||||
|
@ -241,7 +240,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_OUT:
|
_OUT:
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,7 +303,7 @@ void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:1, sync restore finished");
|
mInfo("vgId:1, sync restore finished");
|
||||||
}
|
}
|
||||||
mndRefreshUserIpWhiteList(pMnode);
|
(void)mndRefreshUserIpWhiteList(pMnode);
|
||||||
|
|
||||||
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
|
ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
|
||||||
}
|
}
|
||||||
|
@ -350,16 +349,16 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
mInfo("vgId:1, become follower");
|
mInfo("vgId:1, become follower");
|
||||||
|
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
if (pMgmt->transId != 0) {
|
if (pMgmt->transId != 0) {
|
||||||
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
|
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
|
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
tsem_post(&pMgmt->syncSem);
|
(void)tsem_post(&pMgmt->syncSem);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
|
@ -367,16 +366,16 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
mInfo("vgId:1, become learner");
|
mInfo("vgId:1, become learner");
|
||||||
|
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
if (pMgmt->transId != 0) {
|
if (pMgmt->transId != 0) {
|
||||||
mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
|
mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
|
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
tsem_post(&pMgmt->syncSem);
|
(void)tsem_post(&pMgmt->syncSem);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
|
@ -435,12 +434,12 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t mndInitSync(SMnode *pMnode) {
|
int32_t mndInitSync(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
taosThreadMutexInit(&pMgmt->lock, NULL);
|
(void)taosThreadMutexInit(&pMgmt->lock, NULL);
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
|
|
||||||
SSyncInfo syncInfo = {
|
SSyncInfo syncInfo = {
|
||||||
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
|
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
|
||||||
|
@ -477,7 +476,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tsem_init(&pMgmt->syncSem, 0, 0);
|
(void)tsem_init(&pMgmt->syncSem, 0, 0);
|
||||||
pMgmt->sync = syncOpen(&syncInfo, true);
|
pMgmt->sync = syncOpen(&syncInfo, true);
|
||||||
if (pMgmt->sync <= 0) {
|
if (pMgmt->sync <= 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
|
@ -495,15 +494,15 @@ void mndCleanupSync(SMnode *pMnode) {
|
||||||
syncStop(pMgmt->sync);
|
syncStop(pMgmt->sync);
|
||||||
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
|
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
|
||||||
|
|
||||||
tsem_destroy(&pMgmt->syncSem);
|
(void)tsem_destroy(&pMgmt->syncSem);
|
||||||
taosThreadMutexDestroy(&pMgmt->lock);
|
(void)taosThreadMutexDestroy(&pMgmt->lock);
|
||||||
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncCheckTimeout(SMnode *pMnode) {
|
void mndSyncCheckTimeout(SMnode *pMnode) {
|
||||||
mTrace("check sync timeout");
|
mTrace("check sync timeout");
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
if (pMgmt->transId != 0) {
|
if (pMgmt->transId != 0) {
|
||||||
int32_t curSec = taosGetTimestampSec();
|
int32_t curSec = taosGetTimestampSec();
|
||||||
int32_t delta = curSec - pMgmt->transSec;
|
int32_t delta = curSec - pMgmt->transSec;
|
||||||
|
@ -515,7 +514,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
terrno = TSDB_CODE_SYN_TIMEOUT;
|
terrno = TSDB_CODE_SYN_TIMEOUT;
|
||||||
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
||||||
tsem_post(&pMgmt->syncSem);
|
(void)tsem_post(&pMgmt->syncSem);
|
||||||
} else {
|
} else {
|
||||||
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
||||||
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
|
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
|
||||||
|
@ -523,7 +522,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
||||||
} else {
|
} else {
|
||||||
// mTrace("check sync timeout msg, no trans waiting for confirm");
|
// mTrace("check sync timeout msg, no trans waiting for confirm");
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
|
@ -536,12 +535,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
memcpy(req.pCont, pRaw, req.contLen);
|
memcpy(req.pCont, pRaw, req.contLen);
|
||||||
|
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
pMgmt->errCode = 0;
|
pMgmt->errCode = 0;
|
||||||
|
|
||||||
if (pMgmt->transId != 0) {
|
if (pMgmt->transId != 0) {
|
||||||
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
|
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
rpcFreeCont(req.pCont);
|
rpcFreeCont(req.pCont);
|
||||||
TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
|
TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED);
|
||||||
}
|
}
|
||||||
|
@ -555,23 +554,24 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
|
mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
|
||||||
pMgmt->transSeq = seq;
|
pMgmt->transSeq = seq;
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
tsem_wait(&pMgmt->syncSem);
|
(void)tsem_wait(&pMgmt->syncSem);
|
||||||
} else if (code > 0) {
|
} else if (code > 0) {
|
||||||
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
|
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
if (code == 0) {
|
||||||
code = 0;
|
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
mError("trans:%d, failed to proposed since %s", transId, terrstr());
|
mError("trans:%d, failed to proposed since %s", transId, terrstr());
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
if (terrno == 0) {
|
if (terrno == 0) {
|
||||||
terrno = TSDB_CODE_APP_ERROR;
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -600,15 +600,15 @@ void mndSyncStart(SMnode *pMnode) {
|
||||||
void mndSyncStop(SMnode *pMnode) {
|
void mndSyncStop(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
(void)taosThreadMutexLock(&pMgmt->lock);
|
||||||
if (pMgmt->transId != 0) {
|
if (pMgmt->transId != 0) {
|
||||||
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
|
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
|
pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
|
||||||
tsem_post(&pMgmt->syncSem);
|
(void)tsem_post(&pMgmt->syncSem);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndIsLeader(SMnode *pMnode) {
|
bool mndIsLeader(SMnode *pMnode) {
|
||||||
|
|
Loading…
Reference in New Issue