test: execute trans in follower
This commit is contained in:
parent
3fc793dbe0
commit
33b5efc21f
|
@ -75,6 +75,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||||
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
|
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
|
||||||
void mndTransPullup(SMnode *pMnode);
|
void mndTransPullup(SMnode *pMnode);
|
||||||
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
|
||||||
|
void mndTransExecute(SMnode *pMnode, STrans *pTrans);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,12 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
}
|
}
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
} else {
|
} else {
|
||||||
|
STrans *pTrans = mndAcquireTrans(pMnode, transId);
|
||||||
|
if (pTrans != NULL) {
|
||||||
|
mndTransExecute(pMnode, pTrans);
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
}
|
||||||
|
|
||||||
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
|
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
|
||||||
SSnapshotMeta sMeta = {0};
|
SSnapshotMeta sMeta = {0};
|
||||||
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
|
||||||
|
|
|
@ -52,8 +52,8 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
|
||||||
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
|
||||||
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
|
||||||
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
|
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
|
||||||
|
static bool mndCantExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); }
|
||||||
|
|
||||||
static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
|
|
||||||
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
|
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
|
||||||
static int32_t mndProcessTransReq(SRpcMsg *pReq);
|
static int32_t mndProcessTransReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
|
static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
|
||||||
|
@ -517,12 +517,12 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
||||||
|
|
||||||
if (pOld->stage == TRN_STAGE_COMMIT) {
|
if (pOld->stage == TRN_STAGE_COMMIT) {
|
||||||
pOld->stage = TRN_STAGE_COMMIT_ACTION;
|
pOld->stage = TRN_STAGE_COMMIT_ACTION;
|
||||||
mTrace("trans:%d, stage from commit to commitAction", pNew->id);
|
mTrace("trans:%d, stage from commit to commitAction since perform update action", pNew->id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOld->stage == TRN_STAGE_ROLLBACK) {
|
if (pOld->stage == TRN_STAGE_ROLLBACK) {
|
||||||
pOld->stage = TRN_STAGE_FINISHED;
|
pOld->stage = TRN_STAGE_FINISHED;
|
||||||
mTrace("trans:%d, stage from rollback to finished", pNew->id);
|
mTrace("trans:%d, stage from rollback to finished since perform update action", pNew->id);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -914,7 +914,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
|
||||||
|
|
||||||
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||||
if (pAction->msgSent) return 0;
|
if (pAction->msgSent) return 0;
|
||||||
if (!pMnode->deploy && !mndIsMaster(pMnode)) return -1;
|
if (mndCantExecuteTransAction(pMnode)) return -1;
|
||||||
|
|
||||||
int64_t signature = pTrans->id;
|
int64_t signature = pTrans->id;
|
||||||
signature = (signature << 32);
|
signature = (signature << 32);
|
||||||
|
@ -1114,9 +1114,9 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
||||||
pTrans->lastEpset = pAction->epSet;
|
pTrans->lastEpset = pAction->epSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (mndCantExecuteTransAction(pMnode)) break;
|
||||||
if (!pMnode->deploy && !mndIsMaster(pMnode)) break;
|
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
pTrans->code = 0;
|
pTrans->code = 0;
|
||||||
pTrans->redoActionPos++;
|
pTrans->redoActionPos++;
|
||||||
mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
|
mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
|
||||||
|
@ -1160,6 +1160,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
code = mndTransExecuteRedoActions(pMnode, pTrans);
|
code = mndTransExecuteRedoActions(pMnode, pTrans);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mndCantExecuteTransAction(pMnode)) return false;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->code = 0;
|
pTrans->code = 0;
|
||||||
pTrans->stage = TRN_STAGE_COMMIT;
|
pTrans->stage = TRN_STAGE_COMMIT;
|
||||||
|
@ -1185,6 +1187,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
if (mndCantExecuteTransAction(pMnode)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransCommit(pMnode, pTrans);
|
int32_t code = mndTransCommit(pMnode, pTrans);
|
||||||
|
|
||||||
|
@ -1233,6 +1237,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
||||||
|
|
||||||
|
if (mndCantExecuteTransAction(pMnode)) return false;
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
pTrans->stage = TRN_STAGE_ROLLBACK;
|
pTrans->stage = TRN_STAGE_ROLLBACK;
|
||||||
mDebug("trans:%d, stage from undoAction to rollback", pTrans->id);
|
mDebug("trans:%d, stage from undoAction to rollback", pTrans->id);
|
||||||
|
@ -1250,6 +1256,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
if (mndCantExecuteTransAction(pMnode)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransRollback(pMnode, pTrans);
|
int32_t code = mndTransRollback(pMnode, pTrans);
|
||||||
|
|
||||||
|
@ -1284,10 +1292,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
return continueExec;
|
return continueExec;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
|
|
||||||
while (continueExec) {
|
while (continueExec) {
|
||||||
|
mDebug("trans:%d, continue to execute, stage:%s", pTrans->id, mndTransStr(pTrans->stage));
|
||||||
pTrans->lastExecTime = taosGetTimestampMs();
|
pTrans->lastExecTime = taosGetTimestampMs();
|
||||||
switch (pTrans->stage) {
|
switch (pTrans->stage) {
|
||||||
case TRN_STAGE_PREPARE:
|
case TRN_STAGE_PREPARE:
|
||||||
|
|
|
@ -57,8 +57,8 @@
|
||||||
|
|
||||||
# ---- mnode
|
# ---- mnode
|
||||||
./test.sh -f tsim/mnode/basic1.sim
|
./test.sh -f tsim/mnode/basic1.sim
|
||||||
#./test.sh -f tsim/mnode/basic2.sim
|
./test.sh -f tsim/mnode/basic2.sim
|
||||||
./test.sh -f tsim/mnode/basic3.sim
|
#./test.sh -f tsim/mnode/basic3.sim
|
||||||
./test.sh -f tsim/mnode/basic4.sim
|
./test.sh -f tsim/mnode/basic4.sim
|
||||||
|
|
||||||
# ---- show
|
# ---- show
|
||||||
|
|
|
@ -92,6 +92,8 @@ sql show mnodes
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||||
|
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||||
|
|
||||||
sql show users
|
sql show users
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
|
@ -111,6 +113,8 @@ step3:
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql show dnodes -x step3
|
sql show dnodes -x step3
|
||||||
|
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||||
|
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||||
if $data(1)[4] != ready then
|
if $data(1)[4] != ready then
|
||||||
goto step3
|
goto step3
|
||||||
endi
|
endi
|
||||||
|
|
|
@ -39,7 +39,7 @@ endi
|
||||||
print =============== step2: create mnode 2
|
print =============== step2: create mnode 2
|
||||||
sql create mnode on dnode 2
|
sql create mnode on dnode 2
|
||||||
sql create mnode on dnode 3
|
sql create mnode on dnode 3
|
||||||
return
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
|
||||||
sql_error create mnode on dnode 4
|
sql_error create mnode on dnode 4
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue