diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b8ed29b2a5..48e352bb48 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -977,6 +977,13 @@ typedef struct { int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq); +typedef struct { + int32_t transId; +} SKillTransReq; + +int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq); +int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq); + typedef struct { char user[TSDB_USER_LEN]; char spi; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9e5613ef03..1a63ea73a5 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -136,6 +136,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4b6171362e..ba7ce14466 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2239,6 +2239,31 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) return 0; } +int32_t tSerializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->transId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->transId) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index ffde29e6a7..1670fffb8c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -54,7 +54,8 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans); -static int32_t mndProcessTransReq(SMnodeMsg *pMsg); +static int32_t mndProcessTransReq(SMnodeMsg *pReq); +static int32_t mndProcessKillTransReq(SMnodeMsg *pReq); static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); @@ -70,6 +71,7 @@ int32_t mndInitTrans(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTransActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq); + mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessKillTransReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans); @@ -1002,6 +1004,44 @@ static int32_t mndProcessTransReq(SMnodeMsg *pReq) { return 0; } +static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + + SKillTransReq killReq = {0}; + if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + + mInfo("trans:%d, start to kill", killReq.transId); + + SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + + if (!pUser->superUser) { + mndReleaseUser(pMnode, pUser); + terrno = TSDB_CODE_MND_NO_RIGHTS; + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + mndReleaseUser(pMnode, pUser); + + STrans *pTrans = mndAcquireTrans(pMnode, killReq.transId); + if (pTrans == NULL) { + terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + + // mndTransDrop(pTrans); + mndReleaseTrans(pMnode, pTrans); + return 0; +} + void mndTransPullup(SMnode *pMnode) { STrans *pTrans = NULL; void *pIter = NULL;