Merge pull request #25456 from taosdata/FIX/TD-29704-3.0
fix: execute undo actions of TRN_EXEC_SERIAL trans sequentially
This commit is contained in:
commit
c085efb4f8
|
@ -166,7 +166,7 @@ typedef struct {
|
|||
int32_t failedTimes;
|
||||
void* rpcRsp;
|
||||
int32_t rpcRspLen;
|
||||
int32_t redoActionPos;
|
||||
int32_t actionPos;
|
||||
SArray* prepareActions;
|
||||
SArray* redoActions;
|
||||
SArray* undoActions;
|
||||
|
|
|
@ -169,7 +169,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
|
|||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->actionPos, _OVER)
|
||||
|
||||
int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions);
|
||||
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
|
||||
|
@ -317,7 +317,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->actionPos, _OVER)
|
||||
|
||||
if (sver > TRANS_VER1_NUMBER) {
|
||||
SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER)
|
||||
|
@ -525,7 +525,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
|||
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
|
||||
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
|
||||
pOld->stage = pNew->stage;
|
||||
pOld->redoActionPos = pNew->redoActionPos;
|
||||
pOld->actionPos = pNew->actionPos;
|
||||
|
||||
if (pOld->stage == TRN_STAGE_COMMIT) {
|
||||
pOld->stage = TRN_STAGE_COMMIT_ACTION;
|
||||
|
@ -1360,22 +1360,19 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArray *pActions, bool topHalf) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
|
||||
int32_t numOfActions = taosArrayGetSize(pActions);
|
||||
if (numOfActions == 0) return code;
|
||||
|
||||
taosThreadMutexLock(&pTrans->mutex);
|
||||
|
||||
if (pTrans->redoActionPos >= numOfActions) {
|
||||
taosThreadMutexUnlock(&pTrans->mutex);
|
||||
if (pTrans->actionPos >= numOfActions) {
|
||||
return code;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos);
|
||||
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos);
|
||||
|
||||
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
|
||||
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
|
||||
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
|
||||
STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos);
|
||||
|
||||
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
|
||||
if (code == 0) {
|
||||
|
@ -1409,14 +1406,14 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans,
|
|||
|
||||
if (code == 0) {
|
||||
pTrans->code = 0;
|
||||
pTrans->redoActionPos++;
|
||||
pTrans->actionPos++;
|
||||
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
|
||||
pAction->id);
|
||||
taosThreadMutexUnlock(&pTrans->mutex);
|
||||
code = mndTransSync(pMnode, pTrans);
|
||||
taosThreadMutexLock(&pTrans->mutex);
|
||||
if (code != 0) {
|
||||
pTrans->redoActionPos--;
|
||||
pTrans->actionPos--;
|
||||
pTrans->code = terrno;
|
||||
mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id,
|
||||
mndTransStr(pAction->stage), pAction->id, terrstr());
|
||||
|
@ -1442,8 +1439,26 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans,
|
|||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTrans->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
taosThreadMutexLock(&pTrans->mutex);
|
||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
|
||||
code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTrans->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
taosThreadMutexLock(&pTrans->mutex);
|
||||
if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
|
||||
code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf);
|
||||
}
|
||||
taosThreadMutexUnlock(&pTrans->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1563,13 +1578,22 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, boo
|
|||
|
||||
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||
bool continueExec = true;
|
||||
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
|
||||
int32_t code = 0;
|
||||
|
||||
if (pTrans->exec == TRN_EXEC_SERIAL) {
|
||||
code = mndTransExecuteUndoActionsSerial(pMnode, pTrans, topHalf);
|
||||
} else {
|
||||
code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
|
||||
}
|
||||
|
||||
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
|
||||
terrno = code;
|
||||
|
||||
if (code == 0) {
|
||||
pTrans->stage = TRN_STAGE_PRE_FINISH;
|
||||
mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id);
|
||||
continueExec = true;
|
||||
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) {
|
||||
mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
|
||||
continueExec = false;
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue