kill trans

This commit is contained in:
Shengliang Guan 2022-02-18 10:47:58 +08:00
parent 4a27d2cc75
commit 0f04c426d1
4 changed files with 64 additions and 19 deletions

View File

@ -246,6 +246,7 @@ int32_t* taosGetErrno();
// mnode-trans
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
// mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)

View File

@ -144,7 +144,7 @@ typedef struct {
int32_t transType;
uint64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_DESC_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
} STrans;
typedef struct {

View File

@ -71,7 +71,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessKillTransReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
@ -1004,42 +1004,85 @@ static int32_t mndProcessTransReq(SMnodeMsg *pReq) {
return 0;
}
static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
SArray *pArray = NULL;
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
pArray = pTrans->redoActions;
} else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
pArray = pTrans->undoActions;
} else {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
return -1;
}
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction == NULL) continue;
if (pAction->msgReceived == 0) {
mInfo("trans:%d, action:%d set processed", pTrans->id, i);
pAction->msgSent = 1;
pAction->msgReceived = 1;
pAction->errCode = 0;
}
if (pAction->errCode != 0) {
mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i,
tstrerror(pAction->errCode));
pAction->msgSent = 1;
pAction->msgReceived = 1;
pAction->errCode = 0;
}
}
mndTransExecute(pMnode, pTrans);
return 0;
}
static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SKillTransReq killReq = {0};
int32_t code = -1;
SUserObj *pUser = NULL;
STrans *pTrans = NULL;
if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
return -1;
goto KILL_OVER;
}
mInfo("trans:%d, start to kill", killReq.transId);
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
return -1;
goto KILL_OVER;
}
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
return -1;
goto KILL_OVER;
}
mndReleaseUser(pMnode, pUser);
STrans *pTrans = mndAcquireTrans(pMnode, killReq.transId);
pTrans = mndAcquireTrans(pMnode, killReq.transId);
if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
return -1;
}
// mndTransDrop(pTrans);
code = mndKillTrans(pMnode, pTrans);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
KILL_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
return -1;
}
mndReleaseTrans(pMnode, pTrans);
return 0;
return code;
}
void mndTransPullup(SMnode *pMnode) {
@ -1082,13 +1125,13 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE;
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "db");
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_TRANS_DESC_LEN + VARSTR_HEADER_SIZE;
pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "type");
pSchema[cols].bytes = pShow->bytes[cols];
@ -1100,7 +1143,7 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE;
pShow->bytes[cols] = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "last_error");
pSchema[cols].bytes = pShow->bytes[cols];

View File

@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev
// mnode-trans
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
// mnode-topic
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")