enh: add retryCode for transaction
This commit is contained in:
parent
fb50977b44
commit
007ebbd54a
|
@ -39,8 +39,10 @@ typedef struct {
|
|||
int32_t id;
|
||||
int32_t errCode;
|
||||
int32_t acceptableCode;
|
||||
ETrnStage stage;
|
||||
int32_t retryCode;
|
||||
ETrnAct actionType;
|
||||
ETrnStage stage;
|
||||
int8_t reserved;
|
||||
int8_t rawWritten;
|
||||
int8_t msgSent;
|
||||
int8_t msgReceived;
|
||||
|
|
|
@ -15,15 +15,15 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndTrans.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSync.h"
|
||||
#include "mndUser.h"
|
||||
|
||||
#define TRANS_VER_NUMBER 1
|
||||
#define TRANS_ARRAY_SIZE 8
|
||||
#define TRANS_VER_NUMBER 1
|
||||
#define TRANS_ARRAY_SIZE 8
|
||||
#define TRANS_RESERVE_SIZE 64
|
||||
|
||||
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
|
||||
|
@ -55,7 +55,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
|
|||
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); }
|
||||
|
||||
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
|
||||
static int32_t mndProcessTransReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
|
||||
static int32_t mndProcessTtl(SRpcMsg *pReq);
|
||||
static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
|
||||
|
||||
|
@ -73,7 +73,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
|
|||
.deleteFp = (SdbDeleteFp)mndTransActionDelete,
|
||||
};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransTimer);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
||||
|
@ -139,8 +139,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
|
||||
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||
|
@ -163,8 +165,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
|
||||
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||
|
@ -187,8 +191,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
|
||||
if (pAction->actionType == TRANS_ACTION_RAW) {
|
||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||
|
@ -291,10 +297,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
||||
action.actionType = actionType;
|
||||
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
|
||||
action.stage = stage;
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
|
||||
if (action.actionType == TRANS_ACTION_RAW) {
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
|
@ -324,10 +332,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
||||
action.actionType = actionType;
|
||||
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
|
||||
action.stage = stage;
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
|
||||
if (action.actionType == TRANS_ACTION_RAW) {
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
|
@ -357,10 +367,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
||||
action.actionType = actionType;
|
||||
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
|
||||
action.stage = stage;
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
|
||||
if (action.actionType) {
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
|
@ -463,15 +475,25 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
|||
if (fp) {
|
||||
(*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
|
||||
}
|
||||
pTrans->startFunc = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void mndTransDropData(STrans *pTrans) {
|
||||
mndTransDropActions(pTrans->redoActions);
|
||||
mndTransDropActions(pTrans->undoActions);
|
||||
mndTransDropActions(pTrans->commitActions);
|
||||
if (pTrans->redoActions != NULL) {
|
||||
mndTransDropActions(pTrans->redoActions);
|
||||
pTrans->redoActions = NULL;
|
||||
}
|
||||
if (pTrans->undoActions != NULL) {
|
||||
mndTransDropActions(pTrans->undoActions);
|
||||
pTrans->undoActions = NULL;
|
||||
}
|
||||
if (pTrans->commitActions != NULL) {
|
||||
mndTransDropActions(pTrans->commitActions);
|
||||
pTrans->commitActions = NULL;
|
||||
}
|
||||
if (pTrans->rpcRsp != NULL) {
|
||||
taosMemoryFree(pTrans->rpcRsp);
|
||||
pTrans->rpcRsp = NULL;
|
||||
|
@ -492,6 +514,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
|
|||
if (fp) {
|
||||
(*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
|
||||
}
|
||||
pTrans->stopFunc = 0;
|
||||
}
|
||||
|
||||
mndTransDropData(pTrans);
|
||||
|
@ -805,7 +828,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|||
sendRsp = true;
|
||||
}
|
||||
} else {
|
||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 3) {
|
||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 6) {
|
||||
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
||||
sendRsp = true;
|
||||
}
|
||||
|
@ -875,8 +898,8 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
|||
pAction->errCode = pRsp->code;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action,
|
||||
pRsp->code, pAction->acceptableCode);
|
||||
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
|
||||
mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
|
||||
_OVER:
|
||||
|
@ -884,6 +907,21 @@ _OVER:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||
pAction->rawWritten = 0;
|
||||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
|
||||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
|
||||
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
|
||||
pAction->id, pAction->epSet.inUse);
|
||||
} else {
|
||||
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
||||
}
|
||||
pAction->errCode = 0;
|
||||
}
|
||||
|
||||
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
|
||||
int32_t numOfActions = taosArrayGetSize(pArray);
|
||||
|
||||
|
@ -894,18 +932,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
|
|||
continue;
|
||||
if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue;
|
||||
|
||||
pAction->rawWritten = 0;
|
||||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
|
||||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
|
||||
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
|
||||
action, pAction->epSet.inUse);
|
||||
} else {
|
||||
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
|
||||
}
|
||||
pAction->errCode = 0;
|
||||
mndTransResetAction(pMnode, pTrans, pAction);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1112,9 +1139,9 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
|||
if (pAction->msgReceived) {
|
||||
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
||||
code = pAction->errCode;
|
||||
pAction->msgSent = 0;
|
||||
pAction->msgReceived = 0;
|
||||
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
|
||||
mndTransResetAction(pMnode, pTrans, pAction);
|
||||
} else {
|
||||
mDebug("trans:%d, %s:%d execute successfully", pTrans->id, mndTransStr(pAction->stage), action);
|
||||
}
|
||||
} else {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
@ -1123,6 +1150,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
|||
if (pAction->rawWritten) {
|
||||
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
||||
code = pAction->errCode;
|
||||
} else {
|
||||
mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1156,9 +1185,15 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
|||
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
||||
break;
|
||||
} else if (code == pAction->retryCode) {
|
||||
mDebug("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code);
|
||||
taosMsleep(300);
|
||||
continue;
|
||||
} else {
|
||||
terrno = code;
|
||||
pTrans->code = code;
|
||||
mDebug("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id,
|
||||
mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1343,7 +1378,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
|||
mndTransSendRpcRsp(pMnode, pTrans);
|
||||
}
|
||||
|
||||
static int32_t mndProcessTransReq(SRpcMsg *pReq) {
|
||||
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
|
||||
mndTransPullup(pReq->info.node);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -15,10 +15,10 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndVgroup.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
|
@ -896,6 +896,8 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
|||
action.pCont = pHead;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_ALTER_CONFIRM;
|
||||
// incorrect redirect result will cause this erro
|
||||
action.retryCode = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pHead);
|
||||
|
@ -942,6 +944,8 @@ static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbOb
|
|||
action.contLen = contLen;
|
||||
action.msgType = TDMT_SYNC_SET_VNODE_STANDBY;
|
||||
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
// Keep retrying until the target vnode is not the leader
|
||||
action.retryCode = TSDB_CODE_SYN_IS_LEADER;
|
||||
|
||||
if (isRedo) {
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
|
@ -1229,7 +1233,8 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if (req.dnodeId1 == pVgroup->vnodeGid[0].dnodeId) {
|
||||
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
|
||||
// terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1351,7 +1356,8 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if (pNew1 == NULL && pOld1 == NULL && pNew2 == NULL && pOld2 == NULL && pNew3 == NULL && pOld3 == NULL) {
|
||||
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
|
||||
// terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#./test.sh -f tsim/dnode/drop_dnode_has_vnode_replica3.sim
|
||||
#./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica1.sim
|
||||
#./test.sh -f tsim/dnode/drop_dnode_has_multi_vnode_replica3.sim
|
||||
#./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
./test.sh -f tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
|
||||
# ---- insert
|
||||
|
|
|
@ -65,7 +65,7 @@ sql_error redistribute vgroup 3 dnode 6 dnode 3 dnode 4
|
|||
# vgroup not exist
|
||||
sql_error redistribute vgroup 3 dnode 5 dnode 3 dnode 4
|
||||
# un changed
|
||||
sql_error redistribute vgroup 2 dnode 2 dnode 3 dnode 4
|
||||
# sql_error redistribute vgroup 2 dnode 2 dnode 3 dnode 4
|
||||
# no enought vnodes
|
||||
sql_error redistribute vgroup 2 dnode 1 dnode 3 dnode 4
|
||||
# offline vnodes
|
||||
|
@ -176,8 +176,6 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
|
||||
print =============== step32:
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
|
|
Loading…
Reference in New Issue