From 0f04c426d158c405d8a6335079d9a29b8f478f20 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Feb 2022 10:47:58 +0800 Subject: [PATCH] kill trans --- include/util/taoserror.h | 1 + source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 79 ++++++++++++++++++++------ source/util/src/terror.c | 1 + 4 files changed, 64 insertions(+), 19 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0a822927ad..6f9bc35f2a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -246,6 +246,7 @@ int32_t* taosGetErrno(); // mnode-trans #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) +#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c26b027a88..dc475fb2e2 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -144,7 +144,7 @@ typedef struct { int32_t transType; uint64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; - char lastError[TSDB_TRANS_DESC_LEN]; + char lastError[TSDB_TRANS_ERROR_LEN]; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1670fffb8c..8c358c2e97 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -71,7 +71,7 @@ int32_t mndInitTrans(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndTransActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq); - mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessKillTransReq); + mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans); @@ -1004,42 +1004,85 @@ static int32_t mndProcessTransReq(SMnodeMsg *pReq) { return 0; } -static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; +static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { + SArray *pArray = NULL; + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + pArray = pTrans->redoActions; + } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { + pArray = pTrans->undoActions; + } else { + terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE; + return -1; + } + int32_t size = taosArrayGetSize(pArray); + + for (int32_t i = 0; i < size; ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + if (pAction == NULL) continue; + + if (pAction->msgReceived == 0) { + mInfo("trans:%d, action:%d set processed", pTrans->id, i); + pAction->msgSent = 1; + pAction->msgReceived = 1; + pAction->errCode = 0; + } + + if (pAction->errCode != 0) { + mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i, + tstrerror(pAction->errCode)); + pAction->msgSent = 1; + pAction->msgReceived = 1; + pAction->errCode = 0; + } + } + + mndTransExecute(pMnode, pTrans); + return 0; +} + +static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; SKillTransReq killReq = {0}; + int32_t code = -1; + SUserObj *pUser = NULL; + STrans *pTrans = NULL; + if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; - mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); - return -1; + goto KILL_OVER; } mInfo("trans:%d, start to kill", killReq.transId); - SUserObj *pUser = mndAcquireUser(pMnode, pReq->user); + pUser = mndAcquireUser(pMnode, pReq->user); if (pUser == NULL) { - mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); - return -1; + goto KILL_OVER; } if (!pUser->superUser) { - mndReleaseUser(pMnode, pUser); terrno = TSDB_CODE_MND_NO_RIGHTS; - mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); - return -1; + goto KILL_OVER; } - mndReleaseUser(pMnode, pUser); - STrans *pTrans = mndAcquireTrans(pMnode, killReq.transId); + pTrans = mndAcquireTrans(pMnode, killReq.transId); if (pTrans == NULL) { terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); return -1; } - // mndTransDrop(pTrans); + code = mndKillTrans(pMnode, pTrans); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +KILL_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); + return -1; + } + mndReleaseTrans(pMnode, pTrans); - return 0; + return code; } void mndTransPullup(SMnode *pMnode) { @@ -1082,13 +1125,13 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pSchema[cols].bytes = pShow->bytes[cols]; cols++; - pShow->bytes[cols] = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE; + pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "db"); pSchema[cols].bytes = pShow->bytes[cols]; cols++; - pShow->bytes[cols] = TSDB_TRANS_DESC_LEN + VARSTR_HEADER_SIZE; + pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "type"); pSchema[cols].bytes = pShow->bytes[cols]; @@ -1100,7 +1143,7 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pSchema[cols].bytes = pShow->bytes[cols]; cols++; - pShow->bytes[cols] = TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE; + pShow->bytes[cols] = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "last_error"); pSchema[cols].bytes = pShow->bytes[cols]; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 70608e117e..f4ee13c067 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev // mnode-trans TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") // mnode-topic TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")