set trans type
This commit is contained in:
parent
314116ae11
commit
11c79d1b88
|
@ -105,7 +105,34 @@ typedef enum {
|
|||
} ETrnStage;
|
||||
|
||||
typedef enum {
|
||||
TRN_TYPE_CREATE_DB = 0,
|
||||
TRN_TYPE_START = 0,
|
||||
TRN_TYPE_CREATE_MNODE = 1,
|
||||
TRN_TYPE_DROP_MNODE = 2,
|
||||
TRN_TYPE_CREATE_DNODE = 3,
|
||||
TRN_TYPE_DROP_DNODE = 4,
|
||||
TRN_TYPE_CREATE_SNODE = 5,
|
||||
TRN_TYPE_DROP_SNODE = 6,
|
||||
TRN_TYPE_CREATE_QNODE = 7,
|
||||
TRN_TYPE_DROP_QNODE = 8,
|
||||
TRN_TYPE_CREATE_BNODE = 9,
|
||||
TRN_TYPE_DROP_BNODE = 10,
|
||||
TRN_TYPE_DB_START = 1000,
|
||||
TRN_TYPE_CREATE_DB = 1001,
|
||||
TRN_TYPE_ALTER_DB = 1002,
|
||||
TRN_TYPE_DROP_DB = 1003,
|
||||
TRN_TYPE_CREATE_FUNC = 1004,
|
||||
TRN_TYPE_DROP_FUNC = 1005,
|
||||
TRN_TYPE_CREATE_STB = 1006,
|
||||
TRN_TYPE_ALTER_STB = 1007,
|
||||
TRN_TYPE_DROP_STB = 1008,
|
||||
TRN_TYPE_CREATE_TOPIC = 1009,
|
||||
TRN_TYPE_DROP_TOPIC = 1010,
|
||||
TRN_TYPE_CREATE_USER = 1011,
|
||||
TRN_TYPE_ALTER_USER = 1012,
|
||||
TRN_TYPE_DROP_USER = 1013,
|
||||
TRN_TYPE_SUBSCRIBE = 1014,
|
||||
TRN_TYPE_REBALANCE = 1015,
|
||||
TRN_TYPE_MAX,
|
||||
} ETrnType;
|
||||
|
||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
||||
|
|
|
@ -248,7 +248,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
|||
bnodeObj.createdTime = taosGetTimestampMs();
|
||||
bnodeObj.updateTime = bnodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_BNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -366,7 +366,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
|
|||
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_BNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -434,7 +434,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_DB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||
|
@ -620,7 +620,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto UPDATE_DB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
|
||||
|
@ -799,7 +799,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
|||
|
||||
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_DB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
||||
|
|
|
@ -440,7 +440,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq *
|
|||
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
|
||||
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
|
||||
return -1;
|
||||
|
@ -516,7 +516,7 @@ CREATE_DNODE_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -206,7 +206,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pC
|
|||
memcpy(func.pComment, pCreate->pComment, pCreate->commentSize);
|
||||
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_FUNC_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
|
||||
|
@ -236,7 +236,7 @@ CREATE_FUNC_OVER:
|
|||
|
||||
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_FUNC_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
|
||||
|
|
|
@ -359,7 +359,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
|||
mnodeObj.createdTime = taosGetTimestampMs();
|
||||
mnodeObj.updateTime = mnodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_MNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -526,7 +526,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_MNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
|||
qnodeObj.createdTime = taosGetTimestampMs();
|
||||
qnodeObj.updateTime = qnodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_QNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_QNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -366,7 +366,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
|
|||
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_QNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -248,7 +248,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
|||
snodeObj.createdTime = taosGetTimestampMs();
|
||||
snodeObj.updateTime = snodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_SNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -368,7 +368,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
|
|||
static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_SNODE_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -530,7 +530,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto CREATE_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||
|
@ -1021,7 +1021,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
|
|||
if (code != 0) goto ALTER_STB_OVER;
|
||||
|
||||
code = -1;
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_STB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto ALTER_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||
|
@ -1159,7 +1159,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto DROP_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
|
|
|
@ -369,8 +369,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
|
||||
void *pIter = NULL;
|
||||
|
||||
mInfo("mq rebalance start");
|
||||
|
@ -969,7 +969,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
oldTopicNum = taosArrayGetSize(oldSub);
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
// TODO: free memory
|
||||
return -1;
|
||||
|
|
|
@ -252,7 +252,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
|
|||
topicObj.logicalPlan = pCreate->logicalPlan;
|
||||
topicObj.sqlLen = strlen(pCreate->sql);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
|
@ -343,7 +343,7 @@ CREATE_TOPIC_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
|
|||
userObj.updateTime = userObj.createdTime;
|
||||
userObj.superUser = pCreate->superUser;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_CREATE_USER, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||
return -1;
|
||||
|
@ -350,7 +350,7 @@ CREATE_USER_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to update since %s", pOld->user, terrstr());
|
||||
return -1;
|
||||
|
@ -511,7 +511,7 @@ ALTER_USER_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
||||
return -1;
|
||||
|
|
Loading…
Reference in New Issue