diff --git a/source/dnode/mnode/transaction/src/trnInt.c b/source/dnode/mnode/transaction/src/trnInt.c index 03c1a506b9..f7463ec369 100644 --- a/source/dnode/mnode/transaction/src/trnInt.c +++ b/source/dnode/mnode/transaction/src/trnInt.c @@ -191,6 +191,8 @@ void trnDrop(STrans *pTrans) { tfree(pTrans); } +void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcHandle; } + static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { terrno = TSDB_CODE_MND_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/transaction/src/trnMain.c b/source/dnode/mnode/transaction/src/trnMain.c index 1f8da2689e..6bb46f1283 100644 --- a/source/dnode/mnode/transaction/src/trnMain.c +++ b/source/dnode/mnode/transaction/src/trnMain.c @@ -17,13 +17,6 @@ #include "trnInt.h" #include "trpc.h" -void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { - if (pTrans != NULL) { - pTrans->rpcHandle = rpcHandle; - } -} - - int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) { if (syncfp == NULL) return -1; @@ -69,58 +62,128 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) { return 0; } -int32_t trnExecuteRedoLogs(STrans *pTrans) {return 0;} -int32_t trnExecuteUndoLogs(STrans *pTrans) {return 0;} -int32_t trnExecuteCommitLogs(STrans *pTrans) {return 0;} -int32_t trnExecuteRedoActions(STrans *pTrans) {return 0;} -int32_t trnExecuteUndoActions(STrans *pTrans) {return 0;} -static int32_t trnPerfomRollbackStage(STrans *pTrans) { return 0; } +static int32_t trnExecuteArray(SArray *pArray) { + for (int32_t index = 0; index < pArray->size; ++index) { + SSdbRaw *pRaw = taosArrayGetP(pArray, index); + if (sdbWrite(pRaw) != 0) { + return -1; + } + } + + return 0; +} + +static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); } + +static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); } + +static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); } + +static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); } + +static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); } + +static int32_t trnPerformPrepareStage(STrans *pTrans) { + if (trnExecuteRedoLogs(pTrans) == 0) { + pTrans->stage = TRN_STAGE_EXECUTE; + return 0; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + return -1; + } +} + +static int32_t trnPerformExecuteStage(STrans *pTrans) { + int32_t code = trnExecuteRedoActions(pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_COMMIT; + return 0; + } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + return -1; + } else { + if (pTrans->policy == TRN_POLICY_RETRY) { + pTrans->stage = TRN_STAGE_RETRY; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + } + return 0; + } +} + +static int32_t trnPerformCommitStage(STrans *pTrans) { + if (trnExecuteCommitLogs(pTrans) == 0) { + pTrans->stage = TRN_STAGE_EXECUTE; + return 0; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + return -1; + } +} + +static int32_t trnPerformRollbackStage(STrans *pTrans) { + if (trnExecuteCommitLogs(pTrans) == 0) { + pTrans->stage = TRN_STAGE_EXECUTE; + return 0; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + return -1; + } +} + +static int32_t trnPerformRetryStage(STrans *pTrans) { + if (trnExecuteCommitLogs(pTrans) == 0) { + pTrans->stage = TRN_STAGE_EXECUTE; + return 0; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + return -1; + } +} int32_t trnExecute(int32_t tranId) { int32_t code = 0; STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId); if (pTrans == NULL) { - code = terrno; - return code; + return -1; } if (pTrans->stage == TRN_STAGE_PREPARE) { - code = trnExecuteRedoLogs(pTrans); - if (code == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - } else { - pTrans->stage = TRN_STAGE_ROLLBACK; + if (trnPerformPrepareStage(pTrans) != 0) { + sdbRelease(pTrans); + return -1; } } if (pTrans->stage == TRN_STAGE_EXECUTE) { - code = trnExecuteRedoActions(pTrans); - if (code == 0) { - pTrans->stage = TRN_STAGE_COMMIT; - } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - // do nothing - } else { - if (pTrans->policy == TRN_POLICY_RETRY) { - pTrans->stage = TRN_STAGE_RETRY; - } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - } + if (trnPerformExecuteStage(pTrans) != 0) { + sdbRelease(pTrans); + return -1; } } if (pTrans->stage == TRN_STAGE_COMMIT) { - code = trnExecuteCommitLogs(pTrans); - if (code == 0) { - trnDrop(pTrans); + if (trnPerformCommitStage(pTrans) != 0) { + sdbRelease(pTrans); + return -1; } } if (pTrans->stage == TRN_STAGE_ROLLBACK) { + if (trnPerformRollbackStage(pTrans) != 0) { + sdbRelease(pTrans); + return -1; + } } if (pTrans->stage == TRN_STAGE_RETRY) { + if (trnPerformRetryStage(pTrans) != 0) { + sdbRelease(pTrans); + return -1; + } } + sdbRelease(pTrans); return 0; } \ No newline at end of file