Merge pull request #24176 from taosdata/FIX/TD-27916-3.0
enh: demarcate top and bottom halves of task trans execution
This commit is contained in:
commit
1a215daefa
|
@ -355,6 +355,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5)
|
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5)
|
||||||
#define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal
|
#define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal
|
||||||
#define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7)
|
#define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7)
|
||||||
|
#define TSDB_CODE_MND_TRANS_CTX_SWITCH TAOS_DEF_ERROR_CODE(0, 0x03D8)
|
||||||
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
|
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
|
||||||
|
|
||||||
// mnode-mq
|
// mnode-mq
|
||||||
|
|
|
@ -108,11 +108,11 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -97,7 +97,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans);
|
||||||
SSdbRow *mndTransDecode(SSdbRaw *pRaw);
|
SSdbRow *mndTransDecode(SSdbRaw *pRaw);
|
||||||
void mndTransDropData(STrans *pTrans);
|
void mndTransDropData(STrans *pTrans);
|
||||||
|
|
||||||
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
|
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -290,11 +290,11 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p
|
||||||
|
|
||||||
SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail);
|
SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail);
|
||||||
if (pVgRaw == NULL) return -1;
|
if (pVgRaw == NULL) return -1;
|
||||||
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) {
|
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
|
||||||
sdbFreeRaw(pVgRaw);
|
sdbFreeRaw(pVgRaw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1215,7 +1215,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
|
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -180,7 +180,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
|
mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
|
||||||
" role:%s raw:%p sec:%d seq:%" PRId64,
|
" role:%s raw:%p sec:%d seq:%" PRId64,
|
||||||
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
|
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
|
||||||
pRaw, pMgmt->transSec, pMgmt->transSeq);
|
pRaw, pMgmt->transSec, pMgmt->transSeq);
|
||||||
|
@ -208,15 +208,11 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->stage == TRN_STAGE_PREPARE) {
|
if (pTrans->stage == TRN_STAGE_PREPARE) {
|
||||||
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
|
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
|
||||||
if (!continueExec) goto _OUT;
|
if (!continueExec) goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTrans->id != pMgmt->transId) {
|
mndTransRefresh(pMnode, pTrans);
|
||||||
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
|
|
||||||
pTrans->id, pTrans->createdTime, pMgmt->transId);
|
|
||||||
mndTransRefresh(pMnode, pTrans);
|
|
||||||
}
|
|
||||||
|
|
||||||
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
|
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
|
||||||
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
|
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
|
||||||
|
@ -234,6 +230,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t transId = pMgmt->transId;
|
||||||
pMgmt->transId = 0;
|
pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
pMgmt->transSeq = 0;
|
||||||
|
@ -241,9 +238,9 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
|
|
||||||
if (pMgmt->errCode != 0) {
|
if (pMgmt->errCode != 0) {
|
||||||
mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode));
|
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
|
||||||
} else {
|
} else {
|
||||||
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq);
|
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
|
||||||
}
|
}
|
||||||
|
|
||||||
_OUT:
|
_OUT:
|
||||||
|
@ -542,7 +539,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
taosThreadMutexLock(&pMgmt->lock);
|
taosThreadMutexLock(&pMgmt->lock);
|
||||||
pMgmt->errCode = 0;
|
pMgmt->errCode = 0;
|
||||||
|
|
||||||
if (pMgmt->transId != 0 /* && pMgmt->transId != transId*/) {
|
if (pMgmt->transId != 0) {
|
||||||
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
|
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
|
||||||
taosThreadMutexUnlock(&pMgmt->lock);
|
taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
rpcFreeCont(req.pCont);
|
rpcFreeCont(req.pCont);
|
||||||
|
|
|
@ -36,21 +36,25 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
|
||||||
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
|
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
|
||||||
static void mndTransDropLogs(SArray *pArray);
|
static void mndTransDropLogs(SArray *pArray);
|
||||||
static void mndTransDropActions(SArray *pArray);
|
static void mndTransDropActions(SArray *pArray);
|
||||||
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
|
|
||||||
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
|
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf);
|
||||||
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
|
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
|
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
|
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans);
|
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
|
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); }
|
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
|
||||||
|
|
||||||
|
static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) {
|
||||||
|
return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf;
|
||||||
|
}
|
||||||
|
|
||||||
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
|
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
|
||||||
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
|
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
|
||||||
|
@ -1090,8 +1094,9 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
||||||
if (pAction->rawWritten) return 0;
|
if (pAction->rawWritten) return 0;
|
||||||
|
if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
|
|
||||||
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
|
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
|
||||||
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||||
|
@ -1112,9 +1117,9 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
||||||
if (pAction->msgSent) return 0;
|
if (pAction->msgSent) return 0;
|
||||||
if (mndCannotExecuteTransAction(pMnode)) return -1;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
|
|
||||||
int64_t signature = pTrans->id;
|
int64_t signature = pTrans->id;
|
||||||
signature = (signature << 32);
|
signature = (signature << 32);
|
||||||
|
@ -1159,7 +1164,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
||||||
|
if (!topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
|
||||||
pAction->rawWritten = 0;
|
pAction->rawWritten = 0;
|
||||||
pAction->errCode = 0;
|
pAction->errCode = 0;
|
||||||
mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
||||||
|
@ -1168,34 +1174,39 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
|
||||||
if (pAction->actionType == TRANS_ACTION_RAW) {
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||||
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
|
return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf);
|
||||||
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
} else if (pAction->actionType == TRANS_ACTION_MSG) {
|
||||||
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
|
return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf);
|
||||||
} else {
|
} else {
|
||||||
return mndTransExecNullMsg(pMnode, pTrans, pAction);
|
return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
|
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
|
||||||
int32_t numOfActions = taosArrayGetSize(pArray);
|
int32_t numOfActions = taosArrayGetSize(pArray);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
for (int32_t action = 0; action < numOfActions; ++action) {
|
for (int32_t action = 0; action < numOfActions; ++action) {
|
||||||
STransAction *pAction = taosArrayGet(pArray, action);
|
STransAction *pAction = taosArrayGet(pArray, action);
|
||||||
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
|
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
|
||||||
if (code != 0) break;
|
if (code != 0) {
|
||||||
|
mInfo("trans:%d, action:%d not executed since %s. numOfActions:%d", pTrans->id, action, tstrerror(code),
|
||||||
|
numOfActions);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
|
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
|
||||||
int32_t numOfActions = taosArrayGetSize(pArray);
|
int32_t numOfActions = taosArrayGetSize(pArray);
|
||||||
|
int32_t code = 0;
|
||||||
if (numOfActions == 0) return 0;
|
if (numOfActions == 0) return 0;
|
||||||
|
|
||||||
if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) {
|
if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1248,31 +1259,31 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
|
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
|
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
|
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
|
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
|
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("failed to execute undoActions since %s", terrstr());
|
mError("failed to execute undoActions since %s", terrstr());
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
|
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions);
|
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("failed to execute commitActions since %s", terrstr());
|
mError("failed to execute commitActions since %s", terrstr());
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) {
|
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
|
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
|
||||||
if (numOfActions == 0) return code;
|
if (numOfActions == 0) return code;
|
||||||
|
@ -1289,7 +1300,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
||||||
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
|
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
|
||||||
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
|
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
|
||||||
|
|
||||||
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
|
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (pAction->msgSent) {
|
if (pAction->msgSent) {
|
||||||
if (pAction->msgReceived) {
|
if (pAction->msgReceived) {
|
||||||
|
@ -1317,14 +1328,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
||||||
}
|
}
|
||||||
mndSetTransLastAction(pTrans, pAction);
|
mndSetTransLastAction(pTrans, pAction);
|
||||||
|
|
||||||
if (mndCannotExecuteTransAction(pMnode)) break;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) break;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->code = 0;
|
pTrans->code = 0;
|
||||||
pTrans->redoActionPos++;
|
pTrans->redoActionPos++;
|
||||||
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
|
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
|
||||||
pAction->id);
|
pAction->id);
|
||||||
|
taosThreadMutexUnlock(&pTrans->mutex);
|
||||||
code = mndTransSync(pMnode, pTrans);
|
code = mndTransSync(pMnode, pTrans);
|
||||||
|
taosThreadMutexLock(&pTrans->mutex);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
pTrans->redoActionPos--;
|
pTrans->redoActionPos--;
|
||||||
pTrans->code = terrno;
|
pTrans->code = terrno;
|
||||||
|
@ -1357,7 +1370,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
|
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -1368,7 +1381,7 @@ bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
|
||||||
for (int32_t action = 0; action < numOfActions; ++action) {
|
for (int32_t action = 0; action < numOfActions; ++action) {
|
||||||
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
|
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
|
||||||
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
|
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions);
|
mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions);
|
||||||
return false;
|
return false;
|
||||||
|
@ -1381,17 +1394,17 @@ _OVER:
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pTrans->exec == TRN_EXEC_SERIAL) {
|
if (pTrans->exec == TRN_EXEC_SERIAL) {
|
||||||
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans);
|
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf);
|
||||||
} else {
|
} else {
|
||||||
code = mndTransExecuteRedoActions(pMnode, pTrans);
|
code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCannotExecuteTransAction(pMnode)) return false;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
@ -1431,8 +1444,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
if (mndCannotExecuteTransAction(pMnode)) return false;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransCommit(pMnode, pTrans);
|
int32_t code = mndTransCommit(pMnode, pTrans);
|
||||||
|
@ -1452,9 +1465,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransExecuteCommitActions(pMnode, pTrans);
|
int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf);
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->code = 0;
|
pTrans->code = 0;
|
||||||
|
@ -1471,9 +1484,9 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->stage = TRN_STAGE_PRE_FINISH;
|
pTrans->stage = TRN_STAGE_PRE_FINISH;
|
||||||
|
@ -1491,8 +1504,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
if (mndCannotExecuteTransAction(pMnode)) return false;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransRollback(pMnode, pTrans);
|
int32_t code = mndTransRollback(pMnode, pTrans);
|
||||||
|
@ -1510,8 +1523,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
if (mndCannotExecuteTransAction(pMnode)) return false;
|
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransPreFinish(pMnode, pTrans);
|
int32_t code = mndTransPreFinish(pMnode, pTrans);
|
||||||
|
@ -1529,8 +1542,9 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
bool continueExec = false;
|
bool continueExec = false;
|
||||||
|
if (topHalf) return continueExec;
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndTransEncode(pTrans);
|
SSdbRaw *pRaw = mndTransEncode(pTrans);
|
||||||
if (pRaw == NULL) {
|
if (pRaw == NULL) {
|
||||||
|
@ -1558,43 +1572,28 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
|
||||||
pTrans->lastExecTime = taosGetTimestampMs();
|
pTrans->lastExecTime = taosGetTimestampMs();
|
||||||
switch (pTrans->stage) {
|
switch (pTrans->stage) {
|
||||||
case TRN_STAGE_PREPARE:
|
case TRN_STAGE_PREPARE:
|
||||||
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
|
continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf);
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_REDO_ACTION:
|
case TRN_STAGE_REDO_ACTION:
|
||||||
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
|
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf);
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_COMMIT:
|
case TRN_STAGE_COMMIT:
|
||||||
if (topHalf) {
|
continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf);
|
||||||
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
|
|
||||||
} else {
|
|
||||||
mInfo("trans:%d, can not commit since not leader", pTrans->id);
|
|
||||||
continueExec = false;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_COMMIT_ACTION:
|
case TRN_STAGE_COMMIT_ACTION:
|
||||||
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
|
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf);
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_ROLLBACK:
|
case TRN_STAGE_ROLLBACK:
|
||||||
if (topHalf) {
|
continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf);
|
||||||
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
|
|
||||||
} else {
|
|
||||||
mInfo("trans:%d, can not rollback since not leader", pTrans->id);
|
|
||||||
continueExec = false;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_UNDO_ACTION:
|
case TRN_STAGE_UNDO_ACTION:
|
||||||
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
|
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf);
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_PRE_FINISH:
|
case TRN_STAGE_PRE_FINISH:
|
||||||
if (topHalf) {
|
continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf);
|
||||||
continueExec = mndTransPerformPreFinishStage(pMnode, pTrans);
|
|
||||||
} else {
|
|
||||||
mInfo("trans:%d, can not pre-finish since not leader", pTrans->id);
|
|
||||||
continueExec = false;
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case TRN_STAGE_FINISH:
|
case TRN_STAGE_FINISH:
|
||||||
continueExec = mndTransPerformFinishStage(pMnode, pTrans);
|
continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
continueExec = false;
|
continueExec = false;
|
||||||
|
|
|
@ -286,6 +286,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CTX_SWITCH, "Transaction context switch")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
|
||||||
|
|
||||||
// mnode-mq
|
// mnode-mq
|
||||||
|
|
Loading…
Reference in New Issue