fix: not post sem if transId is 0
This commit is contained in:
parent
8bdec5f2f2
commit
c6bbd18924
|
@ -71,8 +71,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
mInfo("trans:%d, is proposed and post sem", transId);
|
mInfo("trans:%d, is proposed and post sem", transId);
|
||||||
}
|
}
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
taosWUnLockLatch(&pMgmt->lock);
|
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
} else {
|
} else {
|
||||||
taosWUnLockLatch(&pMgmt->lock);
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
@ -113,27 +113,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {
|
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {}
|
||||||
SMnode *pMnode = pFsm->data;
|
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
|
||||||
|
|
||||||
pMgmt->errCode = cbMeta->code;
|
|
||||||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
|
|
||||||
cbMeta->code, cbMeta->index, cbMeta->term);
|
|
||||||
|
|
||||||
taosWLockLatch(&pMgmt->lock);
|
|
||||||
if (pMgmt->transId == -1) {
|
|
||||||
if (pMgmt->errCode != 0) {
|
|
||||||
mError("trans:-1, failed to propose sync reconfig since %s, post sem", tstrerror(pMgmt->errCode));
|
|
||||||
} else {
|
|
||||||
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " post sem",
|
|
||||||
pMgmt->transId, cbMeta->code, cbMeta->index, cbMeta->term);
|
|
||||||
}
|
|
||||||
pMgmt->transId = 0;
|
|
||||||
tsem_post(&pMgmt->syncSem);
|
|
||||||
}
|
|
||||||
taosWUnLockLatch(&pMgmt->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
mInfo("start to read snapshot from sdb");
|
mInfo("start to read snapshot from sdb");
|
||||||
|
@ -179,11 +159,14 @@ void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
||||||
|
|
||||||
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
mInfo("vgId:1, become follower and post sem");
|
mInfo("vgId:1, become follower");
|
||||||
|
|
||||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||||
if (pMnode->syncMgmt.transId != 0) {
|
if (pMnode->syncMgmt.transId != 0) {
|
||||||
|
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader",
|
||||||
|
pMnode->syncMgmt.transId);
|
||||||
pMnode->syncMgmt.transId = 0;
|
pMnode->syncMgmt.transId = 0;
|
||||||
|
pMnode->syncMgmt.errCode = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
taosWUnLockLatch(&pMnode->syncMgmt.lock);
|
||||||
|
@ -292,6 +275,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
mInfo("trans:%d, is proposing and wait sem", pMgmt->transId);
|
||||||
tsem_wait(&pMgmt->syncSem);
|
tsem_wait(&pMgmt->syncSem);
|
||||||
} else if (code > 0) {
|
} else if (code > 0) {
|
||||||
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
|
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
|
||||||
|
@ -301,13 +285,17 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
|
||||||
code = 0;
|
code = 0;
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
} else {
|
||||||
|
taosWLockLatch(&pMgmt->lock);
|
||||||
|
mInfo("trans:%d, failed to proposed since %s", transId, terrstr());
|
||||||
|
pMgmt->transId = 0;
|
||||||
|
taosWUnLockLatch(&pMgmt->lock);
|
||||||
|
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {
|
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_APP_ERROR;
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rpcFreeCont(req.pCont);
|
rpcFreeCont(req.pCont);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -315,6 +303,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMgmt->errCode != 0) terrno = pMgmt->errCode;
|
||||||
return pMgmt->errCode;
|
return pMgmt->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,6 +317,7 @@ void mndSyncStart(SMnode *pMnode) {
|
||||||
void mndSyncStop(SMnode *pMnode) {
|
void mndSyncStop(SMnode *pMnode) {
|
||||||
taosWLockLatch(&pMnode->syncMgmt.lock);
|
taosWLockLatch(&pMnode->syncMgmt.lock);
|
||||||
if (pMnode->syncMgmt.transId != 0) {
|
if (pMnode->syncMgmt.transId != 0) {
|
||||||
|
mInfo("vgId:1, is stopped and post sem, trans:%d", pMnode->syncMgmt.transId);
|
||||||
pMnode->syncMgmt.transId = 0;
|
pMnode->syncMgmt.transId = 0;
|
||||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||||
}
|
}
|
||||||
|
|
|
@ -778,7 +778,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||||
mInfo("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
|
mInfo("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
|
||||||
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
|
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to sync, errno:%s code:%s", pTrans->id, terrstr(), tstrerror(code));
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1083,6 +1083,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
||||||
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
|
sError("failed to create raft cfg file. configPath: %s", pSyncNode->configPath);
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
if (pSyncInfo->syncCfg.replicaNum == 0) {
|
||||||
|
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// update syncCfg by raft_config.json
|
// update syncCfg by raft_config.json
|
||||||
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
||||||
|
@ -2013,8 +2016,27 @@ inline bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
|
||||||
return b1;
|
return b1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool syncIsConfigChanged(const SSyncCfg* pOldCfg, const SSyncCfg* pNewCfg) {
|
||||||
|
if (pOldCfg->replicaNum != pNewCfg->replicaNum) return true;
|
||||||
|
if (pOldCfg->myIndex != pNewCfg->myIndex) return true;
|
||||||
|
for (int32_t i = 0; i < pOldCfg->replicaNum; ++i) {
|
||||||
|
const SNodeInfo* pOldInfo = &pOldCfg->nodeInfo[i];
|
||||||
|
const SNodeInfo* pNewInfo = &pNewCfg->nodeInfo[i];
|
||||||
|
if (strcmp(pOldInfo->nodeFqdn, pNewInfo->nodeFqdn) != 0) return true;
|
||||||
|
if (pOldInfo->nodePort != pNewInfo->nodePort) return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
|
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
|
||||||
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
|
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
|
||||||
|
#if 0
|
||||||
|
if (!syncIsConfigChanged(&oldConfig, pNewConfig)) {
|
||||||
|
sInfo("vgId:1, sync not reconfig since not changed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
pSyncNode->pRaftCfg->cfg = *pNewConfig;
|
pSyncNode->pRaftCfg->cfg = *pNewConfig;
|
||||||
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
|
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue