|
|
|
@ -15,9 +15,9 @@
|
|
|
|
|
|
|
|
|
|
#define _DEFAULT_SOURCE
|
|
|
|
|
#include "mndTrans.h"
|
|
|
|
|
#include "mndPrivilege.h"
|
|
|
|
|
#include "mndConsumer.h"
|
|
|
|
|
#include "mndDb.h"
|
|
|
|
|
#include "mndPrivilege.h"
|
|
|
|
|
#include "mndShow.h"
|
|
|
|
|
#include "mndSync.h"
|
|
|
|
|
#include "mndUser.h"
|
|
|
|
@ -55,7 +55,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
|
|
|
|
|
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); }
|
|
|
|
|
|
|
|
|
|
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
|
|
|
|
|
static int32_t mndProcessTransReq(SRpcMsg *pReq);
|
|
|
|
|
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
|
|
|
|
|
static int32_t mndProcessTtl(SRpcMsg *pReq);
|
|
|
|
|
static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
|
|
|
|
|
|
|
|
|
@ -73,7 +73,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
|
|
|
|
|
.deleteFp = (SdbDeleteFp)mndTransActionDelete,
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransTimer);
|
|
|
|
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
|
|
|
|
|
|
|
|
|
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
|
|
|
@ -139,8 +139,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
|
|
|
|
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
|
|
|
|
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
|
|
|
@ -163,8 +165,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
|
|
|
|
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
|
|
|
|
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
|
|
|
@ -187,8 +191,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
|
|
|
|
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
|
|
|
|
|
if (pAction->actionType == TRANS_ACTION_RAW) {
|
|
|
|
|
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
|
|
|
|
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
|
|
|
@ -291,10 +297,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
|
|
|
|
action.actionType = actionType;
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
|
|
|
|
|
action.stage = stage;
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
|
|
|
|
|
if (action.actionType == TRANS_ACTION_RAW) {
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
|
|
|
@ -324,10 +332,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
|
|
|
|
action.actionType = actionType;
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
|
|
|
|
|
action.stage = stage;
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
|
|
|
|
|
if (action.actionType == TRANS_ACTION_RAW) {
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
|
|
|
@ -357,10 +367,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
|
|
|
|
|
action.actionType = actionType;
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
|
|
|
|
|
action.stage = stage;
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
|
|
|
|
|
if (action.actionType) {
|
|
|
|
|
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
|
|
|
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
|
|
|
@ -463,15 +475,25 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
|
|
|
|
if (fp) {
|
|
|
|
|
(*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
|
|
|
|
|
}
|
|
|
|
|
pTrans->startFunc = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void mndTransDropData(STrans *pTrans) {
|
|
|
|
|
if (pTrans->redoActions != NULL) {
|
|
|
|
|
mndTransDropActions(pTrans->redoActions);
|
|
|
|
|
pTrans->redoActions = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (pTrans->undoActions != NULL) {
|
|
|
|
|
mndTransDropActions(pTrans->undoActions);
|
|
|
|
|
pTrans->undoActions = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (pTrans->commitActions != NULL) {
|
|
|
|
|
mndTransDropActions(pTrans->commitActions);
|
|
|
|
|
pTrans->commitActions = NULL;
|
|
|
|
|
}
|
|
|
|
|
if (pTrans->rpcRsp != NULL) {
|
|
|
|
|
taosMemoryFree(pTrans->rpcRsp);
|
|
|
|
|
pTrans->rpcRsp = NULL;
|
|
|
|
@ -492,6 +514,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
|
|
|
|
|
if (fp) {
|
|
|
|
|
(*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen);
|
|
|
|
|
}
|
|
|
|
|
pTrans->stopFunc = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mndTransDropData(pTrans);
|
|
|
|
@ -805,7 +828,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
sendRsp = true;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 3) {
|
|
|
|
|
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 6) {
|
|
|
|
|
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
|
|
|
|
sendRsp = true;
|
|
|
|
|
}
|
|
|
|
@ -875,8 +898,8 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
|
|
|
|
pAction->errCode = pRsp->code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action,
|
|
|
|
|
pRsp->code, pAction->acceptableCode);
|
|
|
|
|
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
|
|
|
|
|
mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
|
|
|
|
|
mndTransExecute(pMnode, pTrans);
|
|
|
|
|
|
|
|
|
|
_OVER:
|
|
|
|
@ -884,6 +907,21 @@ _OVER:
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
|
|
|
|
pAction->rawWritten = 0;
|
|
|
|
|
pAction->msgSent = 0;
|
|
|
|
|
pAction->msgReceived = 0;
|
|
|
|
|
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
|
|
|
|
|
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
|
|
|
|
|
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
|
|
|
|
|
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
|
|
|
|
|
pAction->id, pAction->epSet.inUse);
|
|
|
|
|
} else {
|
|
|
|
|
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
|
|
|
|
}
|
|
|
|
|
pAction->errCode = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
|
|
|
|
|
int32_t numOfActions = taosArrayGetSize(pArray);
|
|
|
|
|
|
|
|
|
@ -894,18 +932,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
|
|
|
|
|
continue;
|
|
|
|
|
if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue;
|
|
|
|
|
|
|
|
|
|
pAction->rawWritten = 0;
|
|
|
|
|
pAction->msgSent = 0;
|
|
|
|
|
pAction->msgReceived = 0;
|
|
|
|
|
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
|
|
|
|
|
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
|
|
|
|
|
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
|
|
|
|
|
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
|
|
|
|
|
action, pAction->epSet.inUse);
|
|
|
|
|
} else {
|
|
|
|
|
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
|
|
|
|
|
}
|
|
|
|
|
pAction->errCode = 0;
|
|
|
|
|
mndTransResetAction(pMnode, pTrans, pAction);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1112,9 +1139,9 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
|
|
|
|
if (pAction->msgReceived) {
|
|
|
|
|
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
|
|
|
|
code = pAction->errCode;
|
|
|
|
|
pAction->msgSent = 0;
|
|
|
|
|
pAction->msgReceived = 0;
|
|
|
|
|
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
|
|
|
|
|
mndTransResetAction(pMnode, pTrans, pAction);
|
|
|
|
|
} else {
|
|
|
|
|
mDebug("trans:%d, %s:%d execute successfully", pTrans->id, mndTransStr(pAction->stage), action);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
|
|
|
@ -1123,6 +1150,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
|
|
|
|
if (pAction->rawWritten) {
|
|
|
|
|
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
|
|
|
|
code = pAction->errCode;
|
|
|
|
|
} else {
|
|
|
|
|
mDebug("trans:%d, %s:%d write successfully", pTrans->id, mndTransStr(pAction->stage), action);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1156,9 +1185,15 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
|
|
|
|
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
|
|
|
|
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
|
|
|
|
|
break;
|
|
|
|
|
} else if (code == pAction->retryCode) {
|
|
|
|
|
mDebug("trans:%d, %s:%d receive code:0x%x and retry", pTrans->id, mndTransStr(pAction->stage), pAction->id, code);
|
|
|
|
|
taosMsleep(300);
|
|
|
|
|
continue;
|
|
|
|
|
} else {
|
|
|
|
|
terrno = code;
|
|
|
|
|
pTrans->code = code;
|
|
|
|
|
mDebug("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id,
|
|
|
|
|
mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -1343,7 +1378,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
|
|
|
|
mndTransSendRpcRsp(pMnode, pTrans);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t mndProcessTransReq(SRpcMsg *pReq) {
|
|
|
|
|
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
|
|
|
|
|
mndTransPullup(pReq->info.node);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|