minor changes
This commit is contained in:
parent
e9bfd8a4bb
commit
419323274c
|
@ -191,6 +191,8 @@ void trnDrop(STrans *pTrans) {
|
|||
tfree(pTrans);
|
||||
}
|
||||
|
||||
void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcHandle; }
|
||||
|
||||
static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
|
||||
if (pArray == NULL || pRaw == NULL) {
|
||||
terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
|
|
@ -17,13 +17,6 @@
|
|||
#include "trnInt.h"
|
||||
#include "trpc.h"
|
||||
|
||||
void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) {
|
||||
if (pTrans != NULL) {
|
||||
pTrans->rpcHandle = rpcHandle;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
|
||||
if (syncfp == NULL) return -1;
|
||||
|
||||
|
@ -69,58 +62,128 @@ int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t trnExecuteRedoLogs(STrans *pTrans) {return 0;}
|
||||
int32_t trnExecuteUndoLogs(STrans *pTrans) {return 0;}
|
||||
int32_t trnExecuteCommitLogs(STrans *pTrans) {return 0;}
|
||||
int32_t trnExecuteRedoActions(STrans *pTrans) {return 0;}
|
||||
int32_t trnExecuteUndoActions(STrans *pTrans) {return 0;}
|
||||
static int32_t trnPerfomRollbackStage(STrans *pTrans) { return 0; }
|
||||
static int32_t trnExecuteArray(SArray *pArray) {
|
||||
for (int32_t index = 0; index < pArray->size; ++index) {
|
||||
SSdbRaw *pRaw = taosArrayGetP(pArray, index);
|
||||
if (sdbWrite(pRaw) != 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); }
|
||||
|
||||
static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); }
|
||||
|
||||
static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); }
|
||||
|
||||
static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); }
|
||||
|
||||
static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); }
|
||||
|
||||
static int32_t trnPerformPrepareStage(STrans *pTrans) {
|
||||
if (trnExecuteRedoLogs(pTrans) == 0) {
|
||||
pTrans->stage = TRN_STAGE_EXECUTE;
|
||||
return 0;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t trnPerformExecuteStage(STrans *pTrans) {
|
||||
int32_t code = trnExecuteRedoActions(pTrans);
|
||||
|
||||
if (code == 0) {
|
||||
pTrans->stage = TRN_STAGE_COMMIT;
|
||||
return 0;
|
||||
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
return -1;
|
||||
} else {
|
||||
if (pTrans->policy == TRN_POLICY_RETRY) {
|
||||
pTrans->stage = TRN_STAGE_RETRY;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t trnPerformCommitStage(STrans *pTrans) {
|
||||
if (trnExecuteCommitLogs(pTrans) == 0) {
|
||||
pTrans->stage = TRN_STAGE_EXECUTE;
|
||||
return 0;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t trnPerformRollbackStage(STrans *pTrans) {
|
||||
if (trnExecuteCommitLogs(pTrans) == 0) {
|
||||
pTrans->stage = TRN_STAGE_EXECUTE;
|
||||
return 0;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t trnPerformRetryStage(STrans *pTrans) {
|
||||
if (trnExecuteCommitLogs(pTrans) == 0) {
|
||||
pTrans->stage = TRN_STAGE_EXECUTE;
|
||||
return 0;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t trnExecute(int32_t tranId) {
|
||||
int32_t code = 0;
|
||||
|
||||
STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
|
||||
if (pTrans == NULL) {
|
||||
code = terrno;
|
||||
return code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_PREPARE) {
|
||||
code = trnExecuteRedoLogs(pTrans);
|
||||
if (code == 0) {
|
||||
pTrans->stage = TRN_STAGE_EXECUTE;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
if (trnPerformPrepareStage(pTrans) != 0) {
|
||||
sdbRelease(pTrans);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_EXECUTE) {
|
||||
code = trnExecuteRedoActions(pTrans);
|
||||
if (code == 0) {
|
||||
pTrans->stage = TRN_STAGE_COMMIT;
|
||||
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
// do nothing
|
||||
} else {
|
||||
if (pTrans->policy == TRN_POLICY_RETRY) {
|
||||
pTrans->stage = TRN_STAGE_RETRY;
|
||||
} else {
|
||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||
}
|
||||
if (trnPerformExecuteStage(pTrans) != 0) {
|
||||
sdbRelease(pTrans);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_COMMIT) {
|
||||
code = trnExecuteCommitLogs(pTrans);
|
||||
if (code == 0) {
|
||||
trnDrop(pTrans);
|
||||
if (trnPerformCommitStage(pTrans) != 0) {
|
||||
sdbRelease(pTrans);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_ROLLBACK) {
|
||||
if (trnPerformRollbackStage(pTrans) != 0) {
|
||||
sdbRelease(pTrans);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_RETRY) {
|
||||
if (trnPerformRetryStage(pTrans) != 0) {
|
||||
sdbRelease(pTrans);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pTrans);
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue