enh: demarcate top and bottom halves of task trans execution to ensure a consistent order

This commit is contained in:
Benguang Zhao 2023-12-19 19:51:51 +08:00
parent 25ab03e5d3
commit f7ab8e99b5
3 changed files with 74 additions and 84 deletions

View File

@ -97,7 +97,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans);
SSdbRow *mndTransDecode(SSdbRaw *pRaw); SSdbRow *mndTransDecode(SSdbRaw *pRaw);
void mndTransDropData(STrans *pTrans); void mndTransDropData(STrans *pTrans);
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -208,15 +208,11 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
} }
if (pTrans->stage == TRN_STAGE_PREPARE) { if (pTrans->stage == TRN_STAGE_PREPARE) {
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans); bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
if (!continueExec) goto _OUT; if (!continueExec) goto _OUT;
} }
if (pTrans->id != pMgmt->transId) { mndTransRefresh(pMnode, pTrans);
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
pTrans->id, pTrans->createdTime, pMgmt->transId);
mndTransRefresh(pMnode, pTrans);
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);

View File

@ -36,21 +36,25 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray); static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray); static void mndTransDropActions(SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); } static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) {
return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf;
}
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransTimer(SRpcMsg *pReq); static int32_t mndProcessTransTimer(SRpcMsg *pReq);
@ -1090,8 +1094,9 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
} }
} }
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->rawWritten) return 0; if (pAction->rawWritten) return 0;
if (topHalf) return -1;
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
@ -1112,9 +1117,9 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
return code; return code;
} }
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0; if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode)) return -1; if (mndCannotExecuteTransAction(pMnode, topHalf)) return -1;
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
@ -1159,7 +1164,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
return code; return code;
} }
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (!topHalf) return -1;
pAction->rawWritten = 0; pAction->rawWritten = 0;
pAction->errCode = 0; pAction->errCode = 0;
mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
@ -1168,34 +1174,34 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction
return 0; return 0;
} }
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->actionType == TRANS_ACTION_RAW) { if (pAction->actionType == TRANS_ACTION_RAW) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction); return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf);
} else if (pAction->actionType == TRANS_ACTION_MSG) { } else if (pAction->actionType == TRANS_ACTION_MSG) {
return mndTransSendSingleMsg(pMnode, pTrans, pAction); return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf);
} else { } else {
return mndTransExecNullMsg(pMnode, pTrans, pAction); return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf);
} }
} }
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
int32_t numOfActions = taosArrayGetSize(pArray); int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0; int32_t code = 0;
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code != 0) break; if (code != 0) break;
} }
return code; return code;
} }
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
int32_t numOfActions = taosArrayGetSize(pArray); int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0; if (numOfActions == 0) return 0;
if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { if (mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf) != 0) {
return -1; return -1;
} }
@ -1248,31 +1254,31 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
} }
} }
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
} }
return code; return code;
} }
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute undoActions since %s", terrstr()); mError("failed to execute undoActions since %s", terrstr());
} }
return code; return code;
} }
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute commitActions since %s", terrstr()); mError("failed to execute commitActions since %s", terrstr());
} }
return code; return code;
} }
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = 0; int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code; if (numOfActions == 0) return code;
@ -1289,7 +1295,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
code = mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code == 0) { if (code == 0) {
if (pAction->msgSent) { if (pAction->msgSent) {
if (pAction->msgReceived) { if (pAction->msgReceived) {
@ -1317,14 +1323,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} }
mndSetTransLastAction(pTrans, pAction); mndSetTransLastAction(pTrans, pAction);
if (mndCannotExecuteTransAction(pMnode)) break; if (mndCannotExecuteTransAction(pMnode, topHalf)) break;
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
pTrans->redoActionPos++; pTrans->redoActionPos++;
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id); pAction->id);
taosThreadMutexUnlock(&pTrans->mutex);
code = mndTransSync(pMnode, pTrans); code = mndTransSync(pMnode, pTrans);
taosThreadMutexLock(&pTrans->mutex);
if (code != 0) { if (code != 0) {
pTrans->redoActionPos--; pTrans->redoActionPos--;
pTrans->code = terrno; pTrans->code = terrno;
@ -1357,7 +1365,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
return code; return code;
} }
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0; int32_t code = 0;
@ -1368,7 +1376,7 @@ bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions); mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions);
return false; return false;
@ -1381,17 +1389,17 @@ _OVER:
return continueExec; return continueExec;
} }
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0; int32_t code = 0;
if (pTrans->exec == TRN_EXEC_SERIAL) { if (pTrans->exec == TRN_EXEC_SERIAL) {
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans); code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf);
} else { } else {
code = mndTransExecuteRedoActions(pMnode, pTrans); code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
} }
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
terrno = code; terrno = code;
if (code == 0) { if (code == 0) {
@ -1431,8 +1439,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans); int32_t code = mndTransCommit(pMnode, pTrans);
@ -1452,9 +1460,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf);
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
@ -1471,9 +1479,9 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
if (code == 0) { if (code == 0) {
pTrans->stage = TRN_STAGE_PRE_FINISH; pTrans->stage = TRN_STAGE_PRE_FINISH;
@ -1491,8 +1499,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans); int32_t code = mndTransRollback(pMnode, pTrans);
@ -1510,8 +1518,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransPreFinish(pMnode, pTrans); int32_t code = mndTransPreFinish(pMnode, pTrans);
@ -1529,8 +1537,9 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = false; bool continueExec = false;
if (topHalf) return continueExec;
SSdbRaw *pRaw = mndTransEncode(pTrans); SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
@ -1558,43 +1567,28 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
pTrans->lastExecTime = taosGetTimestampMs(); pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) { switch (pTrans->stage) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE:
continueExec = mndTransPerformPrepareStage(pMnode, pTrans); continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_REDO_ACTION: case TRN_STAGE_REDO_ACTION:
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_COMMIT: case TRN_STAGE_COMMIT:
if (topHalf) { continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf);
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not commit since not leader", pTrans->id);
continueExec = false;
}
break; break;
case TRN_STAGE_COMMIT_ACTION: case TRN_STAGE_COMMIT_ACTION:
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_ROLLBACK: case TRN_STAGE_ROLLBACK:
if (topHalf) { continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf);
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not rollback since not leader", pTrans->id);
continueExec = false;
}
break; break;
case TRN_STAGE_UNDO_ACTION: case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_PRE_FINISH: case TRN_STAGE_PRE_FINISH:
if (topHalf) { continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf);
continueExec = mndTransPerformPreFinishStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not pre-finish since not leader", pTrans->id);
continueExec = false;
}
break; break;
case TRN_STAGE_FINISH: case TRN_STAGE_FINISH:
continueExec = mndTransPerformFinishStage(pMnode, pTrans); continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf);
break; break;
default: default:
continueExec = false; continueExec = false;