Merge pull request #19827 from taosdata/fix/TD-22240
fix: should not execute trans if not leader
This commit is contained in:
commit
dda20bd5cb
|
@ -81,7 +81,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||||
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
|
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
|
||||||
void mndTransPullup(SMnode *pMnode);
|
void mndTransPullup(SMnode *pMnode);
|
||||||
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
||||||
void mndTransExecute(SMnode *pMnode, STrans *pTrans);
|
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader);
|
||||||
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
|
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -116,7 +116,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
|
||||||
if (pTrans != NULL) {
|
if (pTrans != NULL) {
|
||||||
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
|
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
|
||||||
transId, pTrans->createdTime, pMgmt->transId);
|
transId, pTrans->createdTime, pMgmt->transId);
|
||||||
mndTransExecute(pMnode, pTrans);
|
mndTransExecute(pMnode, pTrans, false);
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
|
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -905,7 +905,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||||
pTrans->rpcRsp = NULL;
|
pTrans->rpcRsp = NULL;
|
||||||
pTrans->rpcRspLen = 0;
|
pTrans->rpcRspLen = 0;
|
||||||
|
|
||||||
mndTransExecute(pMnode, pNew);
|
mndTransExecute(pMnode, pNew, true);
|
||||||
mndReleaseTrans(pMnode, pNew);
|
mndReleaseTrans(pMnode, pNew);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1068,7 +1068,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
||||||
|
|
||||||
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage),
|
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage),
|
||||||
action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
|
action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
|
||||||
mndTransExecute(pMnode, pTrans);
|
mndTransExecute(pMnode, pTrans, true);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
@ -1502,12 +1502,12 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
|
|
||||||
while (continueExec) {
|
while (continueExec) {
|
||||||
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64, pTrans->id, mndTransStr(pTrans->stage),
|
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " leader:%d", pTrans->id,
|
||||||
pTrans->createdTime);
|
mndTransStr(pTrans->stage), pTrans->createdTime, isLeader);
|
||||||
pTrans->lastExecTime = taosGetTimestampMs();
|
pTrans->lastExecTime = taosGetTimestampMs();
|
||||||
switch (pTrans->stage) {
|
switch (pTrans->stage) {
|
||||||
case TRN_STAGE_PREPARE:
|
case TRN_STAGE_PREPARE:
|
||||||
|
@ -1517,13 +1517,23 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
||||||
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
|
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_COMMIT:
|
case TRN_STAGE_COMMIT:
|
||||||
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
|
if (isLeader) {
|
||||||
|
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
|
||||||
|
} else {
|
||||||
|
mInfo("trans:%d, can not commit since not leader", pTrans->id);
|
||||||
|
continueExec = false;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_COMMIT_ACTION:
|
case TRN_STAGE_COMMIT_ACTION:
|
||||||
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
|
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_ROLLBACK:
|
case TRN_STAGE_ROLLBACK:
|
||||||
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
|
if (isLeader) {
|
||||||
|
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
|
||||||
|
} else {
|
||||||
|
mInfo("trans:%d, can not rollback since not leader", pTrans->id);
|
||||||
|
continueExec = false;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_UNDO_ACTION:
|
case TRN_STAGE_UNDO_ACTION:
|
||||||
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
|
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
|
||||||
|
@ -1566,7 +1576,7 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
||||||
pAction->errCode = 0;
|
pAction->errCode = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransExecute(pMnode, pTrans);
|
mndTransExecute(pMnode, pTrans, true);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1624,7 +1634,7 @@ void mndTransPullup(SMnode *pMnode) {
|
||||||
int32_t *pTransId = taosArrayGet(pArray, i);
|
int32_t *pTransId = taosArrayGet(pArray, i);
|
||||||
STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
|
STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
|
||||||
if (pTrans != NULL) {
|
if (pTrans != NULL) {
|
||||||
mndTransExecute(pMnode, pTrans);
|
mndTransExecute(pMnode, pTrans, true);
|
||||||
}
|
}
|
||||||
mndReleaseTrans(pMnode, pTrans);
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue