refact trans

This commit is contained in:
Shengliang Guan 2021-12-26 23:46:22 +08:00
parent 3052e8e154
commit 0d11825d38
12 changed files with 323 additions and 197 deletions

View File

@ -655,6 +655,10 @@ typedef struct {
SVnodeLoads vnodeLoads;
} SStatusMsg;
typedef struct {
int32_t reserved;
} STransMsg;
typedef struct {
int32_t dnodeId;
int32_t clusterId;

View File

@ -114,6 +114,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL)

View File

@ -118,6 +118,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
// message from dnode to mnode
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;

View File

@ -62,10 +62,14 @@ typedef enum {
typedef enum {
TRN_STAGE_PREPARE = 0,
TRN_STAGE_EXECUTE = 1,
TRN_STAGE_ROLLBACK = 2,
TRN_STAGE_COMMIT = 3,
TRN_STAGE_OVER = 4,
TRN_STAGE_REDO_LOG = 1,
TRN_STAGE_REDO_ACTION = 2,
TRN_STAGE_UNDO_LOG = 3,
TRN_STAGE_UNDO_ACTION = 4,
TRN_STAGE_COMMIT_LOG = 5,
TRN_STAGE_COMMIT = 6,
TRN_STAGE_ROLLBACK = 7,
TRN_STAGE_FINISHED = 8
} ETrnStage;
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
@ -95,7 +99,8 @@ typedef struct {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
int32_t retryTimes;
int32_t code;
int32_t failedTimes;
void *rpcHandle;
void *rpcAHandle;
SArray *redoLogs;
@ -313,12 +318,6 @@ typedef struct SMnodeMsg {
void *pCont;
} SMnodeMsg;
typedef struct {
int32_t id;
int32_t code;
void *rpcHandle;
} STransMsg;
#ifdef __cplusplus
}
#endif

View File

@ -75,6 +75,7 @@ typedef struct SMnode {
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer;
tmr_h transTimer;
char *path;
SMnodeCfg cfg;
int64_t checkTime;

View File

@ -44,11 +44,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
void mndTransHandleActionRsp(SMnodeMsg *pMsg);
char *mndTransStageStr(ETrnStage stage);
char *mndTransPolicyStr(ETrnPolicy policy);
void mndTransProcessRsp(SMnodeMsg *pMsg);
#ifdef __cplusplus
}

View File

@ -567,17 +567,17 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
}
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}

View File

@ -552,7 +552,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) {
}
static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
@ -616,7 +616,7 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) {
}
static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
@ -728,7 +728,7 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) {
}
static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}

View File

@ -524,7 +524,7 @@ static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) {
}
static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
@ -636,7 +636,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) {
}
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
@ -706,7 +706,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
}
static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}

View File

@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray);
@ -36,14 +35,22 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
static void mndTransSendRpcRsp(STrans *pTrans);
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg);
int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS,
@ -54,6 +61,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg);
return sdbSetTable(pMnode->pSdb, table);
}
@ -290,12 +298,12 @@ TRANS_DECODE_OVER:
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
mTrace("trans:%d, perform insert action", pTrans->id);
return 0;
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
mTrace("trans:%d, perform delete action", pTrans->id);
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
@ -307,7 +315,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage));
mTrace("trans:%d, perform update action", pOldTrans->id);
pOldTrans->stage = pNewTrans->stage;
return 0;
}
@ -326,34 +334,6 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
sdbRelease(pSdb, pTrans);
}
char *mndTransStageStr(ETrnStage stage) {
switch (stage) {
case TRN_STAGE_PREPARE:
return "prepare";
case TRN_STAGE_EXECUTE:
return "execute";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_ROLLBACK:
return "rollback";
case TRN_STAGE_OVER:
return "over";
default:
return "undefined";
}
}
char *mndTransPolicyStr(ETrnPolicy policy) {
switch (policy) {
case TRN_POLICY_ROLLBACK:
return "prepare";
case TRN_POLICY_RETRY:
return "retry";
default:
return "undefined";
}
}
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
STrans *pTrans = calloc(1, sizeof(STrans));
if (pTrans == NULL) {
@ -428,23 +408,11 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
return 0;
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw);
mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw);
mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw);
mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code);
return code;
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
void *ptr = taosArrayPush(pArray, pAction);
@ -457,20 +425,14 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
int32_t code = mndTransAppendAction(pTrans->redoActions, pAction);
mTrace("trans:%d, msg:%s append to redo actions, code:0x%x", pTrans->id, TMSG_INFO(pAction->msgType), code);
return code;
return mndTransAppendAction(pTrans->redoActions, pAction);
}
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
int32_t code = mndTransAppendAction(pTrans->undoActions, pAction);
mTrace("trans:%d, msg:%s append to undo actions, code:0x%x", pTrans->id, TMSG_INFO(pAction->msgType), code);
return code;
return mndTransAppendAction(pTrans->undoActions, pAction);
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, prepare transaction", pTrans->id);
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
@ -494,6 +456,17 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
return 0;
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, prepare transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, prepare finished", pTrans->id);
STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
if (pNewTrans == NULL) {
mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
@ -507,84 +480,41 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return 0;
}
int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
mDebug("trans:%d, commit transaction", pTrans->id);
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
return -1;
}
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
if (taosArrayGetSize(pTrans->commitLogs) != 0) {
mTrace("trans:%d, sync to other nodes", pTrans->id);
if (mndSyncPropose(pMnode, pRaw) != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
sdbFreeRaw(pRaw);
return -1;
}
mTrace("trans:%d, sync finished", pTrans->id);
}
if (sdbWrite(pMnode->pSdb, pRaw) != 0) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, commit finished", pTrans->id);
return 0;
}
int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, rollback transaction", pTrans->id);
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
return -1;
}
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
mTrace("trans:%d, sync to other nodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
sdbFreeRaw(pRaw);
return -1;
}
mTrace("trans:%d, sync finished", pTrans->id);
code = sdbWrite(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
return -1;
}
mDebug("trans:%d, rollback finished", pTrans->id);
return 0;
}
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) {
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
mDebug("trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x", pTrans->id, pTrans->rpcHandle, pTrans->rpcAHandle,
code & 0xFFFF);
static void mndTransSendRpcRsp(STrans *pTrans) {
if (pTrans->rpcHandle != NULL) {
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code, .ahandle = pTrans->rpcAHandle};
mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
rpcSendResponse(&rspMsg);
}
}
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) {
// todo
}
void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
void mndTransProcessRsp(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
int64_t sig = (int64_t)(pMsg->rpcMsg.ahandle);
int32_t transId = (int32_t)(sig >> 32);
int32_t action = (int32_t)((sig << 32) >> 32);
int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle);
int32_t transId = (int32_t)(signature >> 32);
int32_t action = (int32_t)((signature << 32) >> 32);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
@ -593,15 +523,17 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) {
}
SArray *pArray = NULL;
if (pTrans->stage == TRN_STAGE_EXECUTE) {
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
pArray = pTrans->redoActions;
} else if (pTrans->stage == TRN_STAGE_ROLLBACK) {
} else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
pArray = pTrans->undoActions;
} else {
mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage);
goto HANDLE_ACTION_RSP_OVER;
}
if (pArray == NULL) {
mError("trans:%d, invalid trans stage:%s", transId, mndTransStageStr(pTrans->stage));
mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage);
goto HANDLE_ACTION_RSP_OVER;
}
@ -653,15 +585,27 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteLogs(pMnode, pTrans->commitLogs);
}
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
}
}
static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
// if (pAction->msgSent && !pAction->msgReceived) continue;
// if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;
if (pAction->msgSent) continue;
int64_t signature = pTrans->id;
@ -684,6 +628,17 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
}
return 0;
}
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0;
if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
return -1;
}
int32_t numOfReceived = 0;
int32_t errCode = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
@ -698,9 +653,15 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
}
if (numOfReceived == numOfActions) {
mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
terrno = errCode;
return errCode;
if (errCode == 0) {
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0;
} else {
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
mndTransResetActions(pMnode, pTrans, pArray);
terrno = errCode;
return errCode;
}
} else {
mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
@ -715,88 +676,229 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
}
static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_LOG;
mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
return continueExec;
}
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
mDebug("trans:%d, stage from prepare to execute", pTrans->id);
pTrans->code = 0;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
pTrans->code = terrno;
pTrans->stage = TRN_STAGE_UNDO_LOG;
mError("trans:%d, stage from redoLog to undoLog", pTrans->id);
}
return continueExec;
}
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT;
mDebug("trans:%d, stage from execute to commit", pTrans->id);
mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code));
mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
continueExec = false;
} else {
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
pTrans->stage = TRN_STAGE_UNDO_ACTION;
mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr());
continueExec = true;
} else {
pTrans->stage = TRN_STAGE_EXECUTE;
pTrans->retryTimes++;
mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr());
pTrans->failedTimes++;
mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
}
return code;
return continueExec;
}
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
mndTransExecuteCommitLogs(pMnode, pTrans);
pTrans->stage = TRN_STAGE_OVER;
return 0;
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT_LOG;
mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_REDO_ACTION;
mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
pTrans->failedTimes);
continueExec = true;
} else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
}
return continueExec;
}
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
pTrans->failedTimes++;
mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
continueExec = false;
;
}
return continueExec;
}
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
continueExec = true;
} else {
mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
continueExec = false;
}
return continueExec;
}
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
if (code == 0) {
mDebug("trans:%d, rollbacked", pTrans->id);
pTrans->stage = TRN_STAGE_REDO_LOG;
mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
continueExec = false;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
pTrans->retryTimes++;
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
pTrans->failedTimes++;
mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
continueExec = false;
}
return code;
return continueExec;
}
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from rollback to finished", pTrans->id);
continueExec = true;
;
} else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
continueExec = false;
}
return continueExec;
}
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = false;
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
}
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
int32_t code = sdbWrite(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
}
mError("trans:%d, exec finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
return continueExec;
}
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
bool continueExec = true;
while (code == 0) {
while (continueExec) {
switch (pTrans->stage) {
case TRN_STAGE_PREPARE:
mndTransPerformPrepareStage(pMnode, pTrans);
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
break;
case TRN_STAGE_EXECUTE:
code = mndTransPerformExecuteStage(pMnode, pTrans);
case TRN_STAGE_REDO_LOG:
continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
break;
case TRN_STAGE_REDO_ACTION:
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_LOG:
continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT_LOG:
continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT:
code = mndTransCommit(pMnode, pTrans);
if (code == 0) {
mndTransPerformCommitStage(pMnode, pTrans);
}
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK:
code = mndTransRollback(pMnode, pTrans);
if (code == 0) {
mndTransPerformRollbackStage(pMnode, pTrans);
}
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
break;
case TRN_STAGE_FINISHED:
continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
break;
default:
mndTransSendRpcRsp(pTrans, 0);
return;
continueExec = false;
break;
}
}
mndTransSendRpcRsp(pTrans, code);
if (pTrans->stage == TRN_STAGE_FINISHED) {
mndTransSendRpcRsp(pTrans);
}
}
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
STrans *pTrans = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) break;
mndTransExecute(pMnode, pTrans);
sdbRelease(pMnode->pSdb, pTrans);
}
}

View File

@ -333,17 +333,17 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) {
}
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) {
mndTransHandleActionRsp(pMsg);
mndTransProcessRsp(pMsg);
return 0;
}

View File

@ -50,6 +50,20 @@ void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) {
}
}
static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg));
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
epSet.port[0] = pMnode->replicas[pMnode->selfIndex].port;
memcpy(epSet.fqdn[0], pMnode->replicas[pMnode->selfIndex].fqdn, TSDB_FQDN_LEN);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)};
mndSendMsgToDnode(pMnode, &epSet, &rpcMsg);
}
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
@ -60,11 +74,18 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1;
}
if (taosTmrReset(mndTransReExecute, 1000, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void mndCleanupTimer(SMnode *pMnode) {
if (pMnode->timer != NULL) {
taosTmrStop(pMnode->transTimer);
pMnode->transTimer = NULL;
taosTmrCleanUp(pMnode->timer);
pMnode->timer = NULL;
}