diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b8ed29b2a5..48e352bb48 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -977,6 +977,13 @@ typedef struct { int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); +typedef struct { + int32_t transId; +} SKillTransReq; + +int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq); +int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq); + typedef struct { char user[TSDB_USER_LEN]; char spi; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9e5613ef03..1a63ea73a5 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -136,6 +136,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0a822927ad..6f9bc35f2a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -246,6 +246,7 @@ int32_t* taosGetErrno(); // mnode-trans #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) +#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4b6171362e..ba7ce14466 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2239,6 +2239,31 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) return 0; } +int32_t tSerializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->transId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->transId) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index b6264e6db9..125b0d3191 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -27,6 +27,7 @@ void mndCleanupDb(SMnode *pMnode); SDbObj *mndAcquireDb(SMnode *pMnode, const char *db); void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen); +char *mnGetDbStr(char *src); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index abc802c588..a092882e1f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -105,7 +105,39 @@ typedef enum { } ETrnStage; typedef enum { - TRN_TYPE_CREATE_DB = 0, + TRN_TYPE_BASIC_SCOPE = 1000, + TRN_TYPE_CREATE_USER = 1001, + TRN_TYPE_ALTER_USER = 1002, + TRN_TYPE_DROP_USER = 1003, + TRN_TYPE_CREATE_FUNC = 1004, + TRN_TYPE_DROP_FUNC = 1005, + TRN_TYPE_CREATE_SNODE = 1006, + TRN_TYPE_DROP_SNODE = 1007, + TRN_TYPE_CREATE_QNODE = 1008, + TRN_TYPE_DROP_QNODE = 1009, + TRN_TYPE_CREATE_BNODE = 1010, + TRN_TYPE_DROP_BNODE = 1011, + TRN_TYPE_CREATE_MNODE = 1012, + TRN_TYPE_DROP_MNODE = 1013, + TRN_TYPE_CREATE_TOPIC = 1014, + TRN_TYPE_DROP_TOPIC = 1015, + TRN_TYPE_SUBSCRIBE = 1016, + TRN_TYPE_REBALANCE = 1017, + TRN_TYPE_BASIC_SCOPE_END, + TRN_TYPE_GLOBAL_SCOPE = 2000, + TRN_TYPE_CREATE_DNODE = 2001, + TRN_TYPE_DROP_DNODE = 2002, + TRN_TYPE_GLOBAL_SCOPE_END, + TRN_TYPE_DB_SCOPE = 3000, + TRN_TYPE_CREATE_DB = 3001, + TRN_TYPE_ALTER_DB = 3002, + TRN_TYPE_DROP_DB = 3003, + TRN_TYPE_CREATE_STB = 3004, + TRN_TYPE_ALTER_STB = 3005, + TRN_TYPE_DROP_STB = 3006, + TRN_TYPE_SPLIT_VGROUP = 3007, + TRN_TYPE_MERGE_VGROUP = 3018, + TRN_TYPE_DB_SCOPE_END, } ETrnType; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; @@ -128,6 +160,7 @@ typedef struct { int32_t id; ETrnStage stage; ETrnPolicy policy; + ETrnType transType; int32_t code; int32_t failedTimes; void* rpcHandle; @@ -141,10 +174,9 @@ typedef struct { SArray* undoActions; int64_t createdTime; int64_t lastExecTime; - int32_t transType; uint64_t dbUid; - char dbname[TSDB_DB_NAME_LEN]; - char lastError[TSDB_TRANS_DESC_LEN]; + char dbname[TSDB_DB_FNAME_LEN]; + char lastError[TSDB_TRANS_ERROR_LEN]; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index f1a213790c..a0218eaf65 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -36,7 +36,7 @@ typedef struct { int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq); +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq); void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); @@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); +void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransProcessRsp(SMnodeMsg *pRsp); diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index fd029212f5..a992135464 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -21,7 +21,7 @@ #include "mndTrans.h" #include "mndUser.h" -#define TSDB_BNODE_VER_NUMBER 1 +#define TSDB_BNODE_VER_NUMBER 1 #define TSDB_BNODE_RESERVE_SIZE 64 static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj); @@ -248,7 +248,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode bnodeObj.createdTime = taosGetTimestampMs(); bnodeObj.updateTime = bnodeObj.createdTime; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_BNODE_OVER; mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId); @@ -366,7 +366,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_BNODE_OVER; mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index eb5f39e983..122982b7f2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -23,7 +23,7 @@ #include "mndUser.h" #include "mndVgroup.h" -#define TSDB_DB_VER_NUMBER 1 +#define TSDB_DB_VER_NUMBER 1 #define TSDB_DB_RESERVE_SIZE 64 static SSdbRaw *mndDbActionEncode(SDbObj *pDb); @@ -434,11 +434,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_DB_OVER; mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); + mndTransSetDbInfo(pTrans, &dbObj); if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; @@ -620,11 +621,12 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg); if (pTrans == NULL) goto UPDATE_DB_OVER; mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name); + mndTransSetDbInfo(pTrans, pOld); if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER; @@ -799,10 +801,11 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_DB_OVER; mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); + mndTransSetDbInfo(pTrans, pDb); if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 6d0b8e7c8d..39ea4b482c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -440,7 +440,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq * memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, &pReq->rpcMsg); if (pTrans == NULL) { mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr()); return -1; @@ -516,7 +516,7 @@ CREATE_DNODE_OVER: } static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, &pReq->rpcMsg); if (pTrans == NULL) { mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 9dbb0fc2d0..b2daf848c5 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -21,7 +21,7 @@ #include "mndTrans.h" #include "mndUser.h" -#define SDB_FUNC_VER 1 +#define SDB_FUNC_VER 1 #define SDB_FUNC_RESERVE_SIZE 64 static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc); @@ -206,7 +206,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pC memcpy(func.pComment, pCreate->pComment, pCreate->commentSize); memcpy(func.pCode, pCreate->pCode, func.codeSize); - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_FUNC_OVER; mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name); @@ -236,7 +236,7 @@ CREATE_FUNC_OVER: static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_FUNC_OVER; mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 720dc1b535..cbb31bc504 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -21,7 +21,7 @@ #include "mndTrans.h" #include "mndUser.h" -#define TSDB_MNODE_VER_NUMBER 1 +#define TSDB_MNODE_VER_NUMBER 1 #define TSDB_MNODE_RESERVE_SIZE 64 static int32_t mndCreateDefaultMnode(SMnode *pMnode); @@ -359,7 +359,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_MNODE_OVER; mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); @@ -526,7 +526,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_MNODE_OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index f1335ad7e4..0c227b0db9 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode qnodeObj.createdTime = taosGetTimestampMs(); qnodeObj.updateTime = qnodeObj.createdTime; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_QNODE, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_QNODE_OVER; mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId); @@ -366,7 +366,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_QNODE_OVER; mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id); diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 5904ca0502..6040aa088c 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -21,7 +21,7 @@ #include "mndTrans.h" #include "mndUser.h" -#define TSDB_SNODE_VER_NUMBER 1 +#define TSDB_SNODE_VER_NUMBER 1 #define TSDB_SNODE_RESERVE_SIZE 64 static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj); @@ -248,7 +248,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode snodeObj.createdTime = taosGetTimestampMs(); snodeObj.updateTime = snodeObj.createdTime; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_SNODE_OVER; mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId); @@ -368,7 +368,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_SNODE_OVER; mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c5b18bcde0..274ae10045 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -530,10 +530,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg); if (pTrans == NULL) goto CREATE_STB_OVER; mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); + mndTransSetDbInfo(pTrans, pDb); if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER; @@ -1021,10 +1022,11 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq * if (code != 0) goto ALTER_STB_OVER; code = -1; - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_STB, &pReq->rpcMsg); if (pTrans == NULL) goto ALTER_STB_OVER; mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name); + mndTransSetDbInfo(pTrans, pDb); if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER; @@ -1159,10 +1161,11 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg); if (pTrans == NULL) goto DROP_STB_OVER; mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name); + mndTransSetDbInfo(pTrans, pDb); if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ceb31149ca..6169aaa78a 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -369,8 +369,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); void *pIter = NULL; mInfo("mq rebalance start"); @@ -969,7 +969,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { oldTopicNum = taosArrayGetSize(oldSub); } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg); if (pTrans == NULL) { // TODO: free memory return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index deac89d68a..d2318009d5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -252,7 +252,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq topicObj.logicalPlan = pCreate->logicalPlan; topicObj.sqlLen = strlen(pCreate->sql); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); return -1; @@ -343,7 +343,7 @@ CREATE_TOPIC_OVER: } static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 99a5a3a475..5fcdc6d8f2 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndTrans.h" #include "mndAuth.h" +#include "mndDb.h" #include "mndShow.h" #include "mndSync.h" #include "mndUser.h" @@ -54,7 +55,8 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); -static int32_t mndProcessTransReq(SMnodeMsg *pMsg); +static int32_t mndProcessTransReq(SMnodeMsg *pReq); +static int32_t mndProcessKillTransReq(SMnodeMsg *pReq); static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); @@ -70,6 +72,7 @@ int32_t mndInitTrans(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTransActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq); + mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans); @@ -122,8 +125,12 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER) - SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER) - SDB_SET_INT8(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->transType, TRANS_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, TRANS_ENCODE_OVER) + SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER) @@ -214,11 +221,32 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans = sdbGetRowObj(pRow); if (pTrans == NULL) goto TRANS_DECODE_OVER; - pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); - pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); + + SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER) + + int16_t type = 0; + int16_t policy = 0; + int16_t stage = 0; + SDB_GET_INT16(pRaw, dataPos, &policy, TRANS_DECODE_OVER) + SDB_GET_INT16(pRaw, dataPos, &stage, TRANS_DECODE_OVER) + SDB_GET_INT16(pRaw, dataPos, &type, TRANS_DECODE_OVER) + pTrans->policy = policy; + pTrans->stage = stage; + pTrans->transType = type; + SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, TRANS_DECODE_OVER) + SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, TRANS_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER) + + pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *)); + pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *)); + pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *)); + pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction)); if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER; if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER; @@ -226,15 +254,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER; if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER; - SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER) - SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER) - SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->stage, TRANS_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER) - SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER) - for (int32_t i = 0; i < redoLogNum; ++i) { SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER) pData = malloc(dataLen); @@ -392,7 +411,7 @@ static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { sdbRelease(pSdb, pTrans); } -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) { +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -403,6 +422,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) { pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; + pTrans->transType = type; pTrans->rpcHandle = pReq->handle; pTrans->rpcAHandle = pReq->ahandle; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -494,6 +514,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) { pTrans->rpcRspLen = contLen; } +void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { + pTrans->dbUid = pDb->uid; + memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); +} + static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { @@ -994,6 +1019,87 @@ static int32_t mndProcessTransReq(SMnodeMsg *pReq) { return 0; } +static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { + SArray *pArray = NULL; + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + pArray = pTrans->redoActions; + } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { + pArray = pTrans->undoActions; + } else { + terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE; + return -1; + } + + int32_t size = taosArrayGetSize(pArray); + + for (int32_t i = 0; i < size; ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + if (pAction == NULL) continue; + + if (pAction->msgReceived == 0) { + mInfo("trans:%d, action:%d set processed", pTrans->id, i); + pAction->msgSent = 1; + pAction->msgReceived = 1; + pAction->errCode = 0; + } + + if (pAction->errCode != 0) { + mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i, + tstrerror(pAction->errCode)); + pAction->msgSent = 1; + pAction->msgReceived = 1; + pAction->errCode = 0; + } + } + + mndTransExecute(pMnode, pTrans); + return 0; +} + +static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SKillTransReq killReq = {0}; + int32_t code = -1; + SUserObj *pUser = NULL; + STrans *pTrans = NULL; + + if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto KILL_OVER; + } + + mInfo("trans:%d, start to kill", killReq.transId); + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto KILL_OVER; + } + + if (!pUser->superUser) { + terrno = TSDB_CODE_MND_NO_RIGHTS; + goto KILL_OVER; + } + + pTrans = mndAcquireTrans(pMnode, killReq.transId); + if (pTrans == NULL) { + terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + + code = mndKillTrans(pMnode, pTrans); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +KILL_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + + mndReleaseTrans(pMnode, pTrans); + return code; +} + void mndTransPullup(SMnode *pMnode) { STrans *pTrans = NULL; void *pIter = NULL; @@ -1099,7 +1205,12 @@ static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, in cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, pTrans->dbname); + char *name = mnGetDbStr(pTrans->dbname); + if (name != NULL) { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); + } else { + STR_TO_VARSTR(pWrite, "-"); + } cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index e40f76daa1..7cdae79c0d 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate userObj.updateTime = userObj.createdTime; userObj.superUser = pCreate->superUser; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_CREATE_USER, &pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to create since %s", pCreate->user, terrstr()); return -1; @@ -350,7 +350,7 @@ CREATE_USER_OVER: } static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to update since %s", pOld->user, terrstr()); return -1; @@ -511,7 +511,7 @@ ALTER_USER_OVER: } static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to drop since %s", pUser->user, terrstr()); return -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 70608e117e..f4ee13c067 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev // mnode-trans TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") // mnode-topic TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")