diff --git a/include/common/systable.h b/include/common/systable.h index bd8ba76f4f..fe867c9ad0 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -63,6 +63,7 @@ extern "C" { #define TSDB_INS_TABLE_TSMAS "ins_tsmas" #define TSDB_INS_DISK_USAGE "ins_disk_usage" #define TSDB_INS_TABLE_FILESETS "ins_filesets" +#define TSDB_INS_TABLE_TRANSACTION_DETAILS "ins_transaction_details" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0b6a8b3f1b..b50291c2e7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -163,6 +163,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_ANODE_FULL, TSDB_MGMT_TABLE_USAGE, TSDB_MGMT_TABLE_FILESETS, + TSDB_MGMT_TABLE_TRANSACTION_DETAIL, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -393,6 +394,7 @@ typedef enum ENodeType { QUERY_NODE_SHOW_VIEWS_STMT, QUERY_NODE_SHOW_COMPACTS_STMT, QUERY_NODE_SHOW_COMPACT_DETAILS_STMT, + QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT, QUERY_NODE_SHOW_GRANTS_FULL_STMT, QUERY_NODE_SHOW_GRANTS_LOGS_STMT, QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT, diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 8eb30b8184..0f736e7068 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -439,6 +439,11 @@ typedef struct SShowCompactDetailsStmt { SNode* pCompactId; } SShowCompactDetailsStmt; +typedef struct SShowTransactionDetailsStmt { + ENodeType type; + SNode* pTransactionId; +} SShowTransactionDetailsStmt; + typedef enum EIndexType { INDEX_TYPE_SMA = 1, INDEX_TYPE_FULLTEXT, INDEX_TYPE_NORMAL } EIndexType; typedef struct SIndexOptions { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 64ef0b3829..2e9b3a0d7f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -411,6 +411,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_MND_TRANS_CTX_SWITCH TAOS_DEF_ERROR_CODE(0, 0x03D8) #define TSDB_CODE_MND_TRANS_CONFLICT_COMPACT TAOS_DEF_ERROR_CODE(0, 0x03D9) #define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF) +#define TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED TAOS_DEF_ERROR_CODE(0, 0x03D2) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/include/util/tdef.h b/include/util/tdef.h index 2ee84b42bd..2b0aa00b1a 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -329,7 +329,11 @@ typedef enum ELogicConditionType { #define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_TYPE_LEN 16 -#define TSDB_TRANS_ERROR_LEN 512 +#define TSDB_TRANS_ERROR_LEN 512 +#define TSDB_TRANS_OBJTYPE_LEN 40 +#define TSDB_TRANS_RESULT_LEN 100 +#define TSDB_TRANS_TARGET_LEN 300 +#define TSDB_TRANS_DETAIL_LEN 100 #define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_DESC_LEN 128 diff --git a/source/common/src/systable.c b/source/common/src/systable.c index cb08046399..aabf204cd7 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -314,6 +314,8 @@ static const SSysDbTableSchema transSchema[] = { {.name = "oper", .bytes = TSDB_TRANS_OPER_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stable", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "killable", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "kill_mnode", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, @@ -403,6 +405,15 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = { {.name = "remain_time(s)", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, }; +static const SSysDbTableSchema userTransactionDetailSchema[] = { + {.name = "transaction_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "action", .bytes = 30 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "obj_type", .bytes = TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "result", .bytes = TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "target", .bytes = TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "detail", .bytes = TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; + static const SSysDbTableSchema anodesSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "url", .bytes = TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, @@ -521,6 +532,7 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_ANODES_FULL, anodesFullSchema, tListLen(anodesFullSchema), true}, {TSDB_INS_DISK_USAGE, diskUsageSchema, tListLen(diskUsageSchema), false}, {TSDB_INS_TABLE_FILESETS, filesetsFullSchema, tListLen(filesetsFullSchema), false}, + {TSDB_INS_TABLE_TRANSACTION_DETAILS, userTransactionDetailSchema, tListLen(userTransactionDetailSchema), false}, }; static const SSysDbTableSchema connectionsSchema[] = { diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ccc6439b5d..065b0517ee 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -703,6 +703,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) { return TSDB_CODE_INVALID_MSG; } + dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId); #if 0 if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) { code = TSDB_CODE_MND_NO_RIGHTS; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e3d2ad6d34..b32274e0a7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -133,6 +133,12 @@ typedef enum { TRN_EXEC_SERIAL = 1, } ETrnExec; +typedef enum { + TRN_KILL_MODE_SKIP = 0, + TRN_KILL_MODE_INTERUPT = 1, + //TRN_KILL_MODE_ROLLBACK = 2, +} ETrnKillMode; + typedef enum { DND_REASON_ONLINE = 0, DND_REASON_STATUS_MSG_TIMEOUT, @@ -201,6 +207,8 @@ typedef struct { SRWLatch lockRpcArray; int64_t mTraceId; TdThreadMutex mutex; + bool ableToBeKilled; + ETrnKillMode killMode; } STrans; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 7f039bc21f..05280d0d68 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -54,6 +54,8 @@ typedef struct { SSdbRaw *pRaw; int64_t mTraceId; + int64_t startTime; + int64_t endTime; } STransAction; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); @@ -80,6 +82,8 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, voi void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId); void mndTransSetSerial(STrans *pTrans); +void mndTransSetBeKilled(STrans *pTrans, bool ableToBeKilled); +void mndTransSetKillMode(STrans *pTrans, ETrnKillMode killMode); void mndTransSetParallel(STrans *pTrans); void mndTransSetChangeless(STrans *pTrans); void mndTransSetOper(STrans *pTrans, EOperType oper); diff --git a/source/dnode/mnode/impl/src/mndCompactDetail.c b/source/dnode/mnode/impl/src/mndCompactDetail.c index 9a053066b2..0052f5de56 100644 --- a/source/dnode/mnode/impl/src/mndCompactDetail.c +++ b/source/dnode/mnode/impl/src/mndCompactDetail.c @@ -45,6 +45,8 @@ int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB char *sep = NULL; SDbObj *pDb = NULL; + mInfo("retrieve compact detail"); + if (strlen(pShow->db) > 0) { sep = strchr(pShow->db, '.'); if (sep && diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 9d02fdf115..1d2b2c262b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1017,7 +1017,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER); TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser, dnodeList), &lino, _OVER); - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + //if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; SName name = {0}; if (tNameFromString(&name, createReq.db, T_NAME_ACCT | T_NAME_DB) < 0) @@ -1286,6 +1286,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p TAOS_RETURN(code); } mInfo("trans:%d, used to alter db:%s", pTrans->id, pOld->name); + mInfo("trans:%d, used to alter db, ableToBeKilled:%d, killMode:%d", pTrans->id, pTrans->ableToBeKilled, pTrans->killMode); mndTransSetDbName(pTrans, pOld->name, NULL); TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); @@ -1294,6 +1295,8 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew), NULL, _OVER); + + mInfo("trans:%d, used to alter db, ableToBeKilled:%d, killMode:%d", pTrans->id, pTrans->ableToBeKilled, pTrans->killMode); TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 49dc62d471..f19eabd885 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -132,6 +132,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_COMPACT; } else if (strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, len) == 0) { type = TSDB_MGMT_TABLE_COMPACT_DETAIL; + } else if (strncasecmp(name, TSDB_INS_TABLE_TRANSACTION_DETAILS, len) == 0) { + type = TSDB_MGMT_TABLE_TRANSACTION_DETAIL; } else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_FULL, len) == 0) { type = TSDB_MGMT_TABLE_GRANTS_FULL; } else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_LOGS, len) == 0) { @@ -236,7 +238,8 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { SRetrieveTableReq retrieveReq = {0}; TAOS_CHECK_RETURN(tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq)); - mDebug("process to retrieve systable req db:%s, tb:%s", retrieveReq.db, retrieveReq.tb); + mDebug("process to retrieve systable req db:%s, tb:%s, compactId:%" PRId64, retrieveReq.db, retrieveReq.tb, + retrieveReq.compactId); if (retrieveReq.showId == 0) { STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb)); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 718b7d0df6..bbdb510344 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -22,9 +22,11 @@ #include "mndSync.h" #include "mndTrans.h" #include "mndUser.h" +#include "mndVgroup.h" #define TRANS_VER1_NUMBER 1 #define TRANS_VER2_NUMBER 2 +#define TRANS_VER3_NUMBER 3 #define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 44 @@ -70,7 +72,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq); static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); - +static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t tsMaxTransId = 0; int32_t mndInitTrans(SMnode *pMnode) { @@ -89,6 +91,7 @@ int32_t mndInitTrans(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANSACTION_DETAIL, mndRetrieveTransDetail); return sdbSetTable(pMnode->pSdb, table); } @@ -156,7 +159,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { int32_t code = 0; int32_t lino = 0; terrno = TSDB_CODE_INVALID_MSG; - int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER; + int8_t sver = TRANS_VER3_NUMBER; int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen; rawDataLen += mndTransGetActionsSize(pTrans->prepareActions); @@ -220,6 +223,11 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { pIter = taosHashIterate(pTrans->arbGroupIds, pIter); } + if (sver > TRANS_VER1_NUMBER) { + SDB_SET_INT8(pRaw, dataPos, pTrans->ableToBeKilled, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->killMode, _OVER) + } + SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) @@ -310,7 +318,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != TRANS_VER1_NUMBER && sver != TRANS_VER2_NUMBER) { + if (sver > TRANS_VER3_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -389,6 +397,15 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { if ((terrno = taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0)) != 0) goto _OVER; } + if (sver > TRANS_VER2_NUMBER) { + int8_t ableKill = 0; + int8_t killMode = 0; + SDB_GET_INT8(pRaw, dataPos, &ableKill, _OVER) + SDB_GET_INT8(pRaw, dataPos, &killMode, _OVER) + pTrans->ableToBeKilled = ableKill; + pTrans->killMode = killMode; + } + SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) terrno = 0; @@ -430,12 +447,25 @@ static const char *mndTransStr(ETrnStage stage) { } } +static const char *mndTransTypeStr(ETrnAct actionType) { + switch (actionType) { + case TRANS_ACTION_MSG: + return "msg"; + case TRANS_ACTION_RAW: + return "sdb"; + default: + return "invalid"; + } +} + static void mndSetTransLastAction(STrans *pTrans, STransAction *pAction) { if (pAction != NULL) { - pTrans->lastAction = pAction->id; - pTrans->lastMsgType = pAction->msgType; - pTrans->lastEpset = pAction->epSet; - pTrans->lastErrorNo = pAction->errCode; + if (pAction->errCode != TSDB_CODE_ACTION_IN_PROGRESS) { + pTrans->lastAction = pAction->id; + pTrans->lastMsgType = pAction->msgType; + pTrans->lastEpset = pAction->epSet; + pTrans->lastErrorNo = pAction->errCode; + } } else { pTrans->lastAction = 0; pTrans->lastMsgType = 0; @@ -636,6 +666,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->policy = policy; pTrans->conflict = conflict; pTrans->exec = TRN_EXEC_PARALLEL; + pTrans->ableToBeKilled = false; pTrans->createdTime = taosGetTimestampMs(); pTrans->prepareActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); @@ -804,6 +835,13 @@ void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) { void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } +void mndTransSetBeKilled(STrans *pTrans, bool ableToBeKilled) { pTrans->ableToBeKilled = ableToBeKilled; } + +void mndTransSetKillMode(STrans *pTrans, ETrnKillMode killMode) { + pTrans->ableToBeKilled = true; + pTrans->killMode = killMode; +} + void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; } void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; } @@ -1043,6 +1081,39 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return TSDB_CODE_INVALID_PARA; } + mInfo("trans:%d, action list:", pTrans->id); + int32_t index = 0; + for (int32_t i = 0; i < taosArrayGetSize(pTrans->prepareActions); ++i, ++index) { + STransAction *pAction = taosArrayGet(pTrans->prepareActions, i); + mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index, + mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); + } + + for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i, ++index) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + mInfo("trans:%d, action:%d, %s:%d msgType:%s", pTrans->id, index, + mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));; + } + + for (int32_t i = 0; i < taosArrayGetSize(pTrans->commitActions); ++i, ++index) { + STransAction *pAction = taosArrayGet(pTrans->commitActions, i); + mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index, + mndTransStr(pAction->stage), i, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); + } + + for (int32_t i = 0; i < taosArrayGetSize(pTrans->undoActions); ++i, ++index) { + STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + if(pAction->actionType == TRANS_ACTION_MSG){ + mInfo("trans:%d, action:%d, %s:%d msgType:%s", pTrans->id, index, + mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));; + } + else{ + mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index, + mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); + } + } + + TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); TAOS_CHECK_RETURN(mndTransCheckParallelActions(pMnode, pTrans)); @@ -1260,7 +1331,9 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { if (pAction != NULL) { pAction->msgReceived = 1; pAction->errCode = pRsp->code; - pTrans->lastErrorNo = pRsp->code; + pAction->endTime = taosGetTimestampMs(); + // pTrans->lastErrorNo = pRsp->code; + mndSetTransLastAction(pTrans, pAction); mInfo("trans:%d, %s:%d response is received, received code:0x%x(%s), accept:0x%x(%s) retry:0x%x(%s)", transId, mndTransStr(pAction->stage), action, pRsp->code, tstrerror(pRsp->code), pAction->acceptableCode, @@ -1374,6 +1447,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio pAction->msgSent = 1; // pAction->msgReceived = 0; pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS; + pAction->startTime = taosGetTimestampMs(); + pAction->endTime = 0; mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail); mndSetTransLastAction(pTrans, pAction); @@ -1527,8 +1602,9 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pActions, action); - mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d", pTrans->id, pTrans->actionPos, - mndTransStr(pAction->stage), pAction->actionType); + mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d, msgSent:%d, msgReceived:%d", + pTrans->id, pTrans->actionPos, mndTransStr(pAction->stage), pAction->actionType, pAction->msgSent, + pAction->msgReceived); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { @@ -1924,13 +2000,25 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { TAOS_RETURN(TSDB_CODE_MND_TRANS_INVALID_STAGE); } - for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - STransAction *pAction = taosArrayGet(pArray, i); - mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id, - mndTransStr(pAction->stage), i, tstrerror(pAction->errCode)); - pAction->msgSent = 1; - pAction->msgReceived = 1; - pAction->errCode = 0; + if(pTrans->ableToBeKilled == false){ + return TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED; + } + + if(pTrans->killMode == TRN_KILL_MODE_SKIP){ + for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id, + mndTransStr(pAction->stage), i, tstrerror(pAction->errCode)); + pAction->msgSent = 1; + pAction->msgReceived = 1; + pAction->errCode = 0; + } + } + else if(pTrans->killMode == TRN_KILL_MODE_INTERUPT){ + pTrans->stage = TRN_STAGE_PRE_FINISH; + } + else{ + return TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED; } mndTransExecute(pMnode, pTrans); @@ -2002,6 +2090,85 @@ void mndTransPullup(SMnode *pMnode) { taosArrayDestroy(pArray); } +static void mndTransLogAction(STrans *pTrans) { + char detail[512] = {0}; + int32_t len = 0; + int32_t index = 0; + + if (pTrans->stage == TRN_STAGE_PREPARE) { + for (int32_t i = 0; i < taosArrayGetSize(pTrans->prepareActions); ++i, ++index) { + len = 0; + STransAction *pAction = taosArrayGet(pTrans->prepareActions, i); + len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index, + mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), + sdbStatusName(pAction->pRaw->status)); + mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail); + } + } + + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i, ++index) { + len = 0; + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + if (pAction->actionType == TRANS_ACTION_MSG) { + char bufStart[40] = {0}; + taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI); + + char endStart[40] = {0}; + taosFormatUtcTime(endStart, sizeof(endStart), pAction->endTime, TSDB_TIME_PRECISION_MILLI); + len += snprintf(detail + len, sizeof(detail) - len, + "action:%d, %s:%d msgType:%s," + "sent:%d, received:%d, startTime:%s, endTime:%s, ", + index, mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType), pAction->msgSent, + pAction->msgReceived, bufStart, endStart); + + SEpSet epset = pAction->epSet; + if (epset.numOfEps > 0) { + len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse); + for (int32_t i = 0; i < epset.numOfEps; ++i) { + len += + snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port); + } + } + + len += snprintf(detail + len, sizeof(detail) - len, ", errCode:0x%x(%s)\n", pAction->errCode & 0xFFFF, + tstrerror(pAction->errCode)); + } else { + len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s, written:%d\n", + index, mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), + sdbStatusName(pAction->pRaw->status), pAction->rawWritten); + } + mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail); + } + } + + if (pTrans->stage == TRN_STAGE_COMMIT_ACTION) { + for (int32_t i = 0; i < taosArrayGetSize(pTrans->commitActions); ++i, ++index) { + len = 0; + STransAction *pAction = taosArrayGet(pTrans->commitActions, i); + len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index, + mndTransStr(pAction->stage), i, sdbTableName(pAction->pRaw->type), + sdbStatusName(pAction->pRaw->status)); + mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail); + } + + for (int32_t i = 0; i < taosArrayGetSize(pTrans->undoActions); ++i, ++index) { + len = 0; + STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + if (pAction->actionType == TRANS_ACTION_MSG) { + len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d msgType:%s\n", index, + mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType)); + ; + } else { + len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index, + mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), + sdbStatusName(pAction->pRaw->status)); + } + mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail); + } + } +} + static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2044,6 +2211,18 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbname, false), pTrans, &lino, _OVER); + const char *killableStr = pTrans->ableToBeKilled ? "yes" : "no"; + char killableVstr[10 + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(killableVstr, killableStr, 24); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)killableVstr, false); + + const char *killModeStr = pTrans->killMode == TRN_KILL_MODE_SKIP ? "skip" : "interrupt"; + char killModeVstr[10 + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(killModeVstr, killModeStr, 24); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)killModeVstr, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false), pTrans, &lino, _OVER); @@ -2052,7 +2231,6 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false), pTrans, &lino, _OVER); - char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; char detail[TSDB_TRANS_ERROR_LEN + 1] = {0}; int32_t len = tsnprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction, pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo)); @@ -2061,13 +2239,17 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl len += tsnprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ", TMSG_INFO(pTrans->lastMsgType), epset.numOfEps, epset.inUse); for (int32_t i = 0; i < pTrans->lastEpset.numOfEps; ++i) { - len += tsnprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port); + len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u \n", i, epset.eps[i].fqdn, epset.eps[i].port); } } + + char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(lastInfo, detail, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)lastInfo, false), pTrans, &lino, _OVER); + mndTransLogAction(pTrans); + numOfRows++; sdbRelease(pSdb, pTrans); } @@ -2078,6 +2260,241 @@ _OVER: return numOfRows; } +static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, + int32_t transactionId, int32_t curActionId, int32_t numOfRows, int32_t *cols) { + int32_t code = 0; + int32_t lino = 0; + int32_t len = 0; + + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&transactionId, false), &lino, _OVER); + + char action[30 + 1] = {0}; + if (curActionId == pAction->id) { + len += snprintf(action + len, sizeof(action) - len, "%s:%d(%s)<-last", mndTransStr(pAction->stage), pAction->id, + mndTransTypeStr(pAction->actionType)); + } else { + len += snprintf(action + len, sizeof(action) - len, "%s:%d(%s)", mndTransStr(pAction->stage), pAction->id, + mndTransTypeStr(pAction->actionType)); + } + char actionVStr[30 + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(actionVStr, action, pShow->pMeta->pSchemas[*cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)actionVStr, false), &lino, _OVER); +_OVER: + if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code)); + return code; +} + +static int32_t mndShowTransAction(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, int32_t transactionId, + int32_t curActionId, int32_t rows, int32_t numOfRows) { + int32_t code = 0; + int32_t lino = 0; + int32_t len = 0; + int32_t cols = 0; + + cols = 0; + + mndShowTransCommonColumns(pShow, pBlock, pAction, transactionId, curActionId, numOfRows, &cols); + + if (pAction->actionType == TRANS_ACTION_MSG) { + int32_t len = 0; + + char objType[TSDB_TRANS_OBJTYPE_LEN + 1] = {0}; + len += snprintf(objType + len, sizeof(objType) - len, "%s(s:%d,r:%d)", TMSG_INFO(pAction->msgType), + pAction->msgSent, pAction->msgReceived); + char objTypeVStr[TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER); + + char result[TSDB_TRANS_RESULT_LEN + 1] = {0}; + len = 0; + len += snprintf(result + len, sizeof(result) - len, "errCode:0x%x(%s)", pAction->errCode & 0xFFFF, + tstrerror(pAction->errCode)); + char resultVStr[TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER); + + char target[TSDB_TRANS_TARGET_LEN] = {0}; + len = 0; + SEpSet epset = pAction->epSet; + if (epset.numOfEps > 0) { + for (int32_t i = 0; i < epset.numOfEps; ++i) { + len += snprintf(target + len, sizeof(target) - len, "ep:%d-%s:%u,", i, epset.eps[i].fqdn, epset.eps[i].port); + } + len += snprintf(target + len, sizeof(target) - len, "(%d:%d) ", epset.numOfEps, epset.inUse); + } + char targetVStr[TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(targetVStr, target, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)targetVStr, false), &lino, _OVER); + + char detail[TSDB_TRANS_DETAIL_LEN] = {0}; + len = 0; + char bufStart[40] = {0}; + if (pAction->startTime > 0) + taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI); + char bufEnd[40] = {0}; + if (pAction->endTime > 0) taosFormatUtcTime(bufEnd, sizeof(bufEnd), pAction->endTime, TSDB_TIME_PRECISION_MILLI); + len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd); + char detailVStr[TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER); + + } else { + int32_t len = 0; + + char objType[TSDB_TRANS_OBJTYPE_LEN + 1] = {0}; + if (pAction->pRaw->type == SDB_VGROUP) { + SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw); + SVgObj *pVgroup = sdbGetRowObj(pRow); + len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type), pVgroup->vgId); + taosMemoryFreeClear(pRow); + } else { + strcpy(objType, sdbTableName(pAction->pRaw->type)); + } + char objTypeVStr[TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER); + + char result[TSDB_TRANS_RESULT_LEN + 1] = {0}; + len = 0; + len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten); + char resultVStr[TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER); + + char target[TSDB_TRANS_TARGET_LEN] = ""; + char targetVStr[TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(targetVStr, target, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)targetVStr, false), &lino, _OVER); + + char detail[TSDB_TRANS_DETAIL_LEN] = {0}; + len = 0; + len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status)); + char detailVStr[TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER); + } + +_OVER: + if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code)); + return code; +} + +static SArray *mndTransGetAction(STrans *pTrans, ETrnStage stage) { + if (stage == TRN_STAGE_PREPARE) { + return pTrans->prepareActions; + } + if (stage == TRN_STAGE_REDO_ACTION) { + return pTrans->redoActions; + } + if (stage == TRN_STAGE_COMMIT_ACTION) { + return pTrans->commitActions; + } + if (stage == TRN_STAGE_UNDO_ACTION) { + return pTrans->undoActions; + } + return NULL; +} + +typedef struct STransDetailIter { + void *pIter; + STrans *pTrans; + ETrnStage stage; + int32_t num; +} STransDetailIter; + +static void mndTransShowActions(SSdb *pSdb, STransDetailIter *pShowIter, SShowObj *pShow, SSDataBlock *pBlock, + int32_t rows, int32_t *numOfRows, SArray *pActions, int32_t end, int32_t start) { + int32_t actionNum = taosArrayGetSize(pActions); + mInfo("stage:%s, Actions num:%d", mndTransStr(pShowIter->stage), actionNum); + + for (int32_t i = start; i < actionNum; ++i) { + STransAction *pAction = taosArrayGet(pShowIter->pTrans->redoActions, i); + mndShowTransAction(pShow, pBlock, pAction, pShowIter->pTrans->id, pShowIter->pTrans->lastAction, rows, *numOfRows); + (*numOfRows)++; + if (*numOfRows >= rows) break; + } + + if (*numOfRows == end) { + sdbRelease(pSdb, pShowIter->pTrans); + pShowIter->pTrans = NULL; + pShowIter->num = 0; + } else { + pShowIter->pTrans = pShowIter->pTrans; + pShowIter->stage = pShowIter->pTrans->stage; + pShowIter->num += (*numOfRows); + } +} + +static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + + int32_t code = 0; + int32_t lino = 0; + + mInfo("start to mndRetrieveTransDetail, rows:%d, pShow->numOfRows:%d, pShow->pIter:%p", rows, pShow->numOfRows, + pShow->pIter); + + if (pShow->pIter == NULL) { + pShow->pIter = taosMemoryMalloc(sizeof(STransDetailIter)); + if (pShow->pIter == NULL) { + mError("failed to malloc for pShow->pIter"); + return 0; + } + memset(pShow->pIter, 0, sizeof(STransDetailIter)); + } + + STransDetailIter *pShowIter = (STransDetailIter *)pShow->pIter; + + while (numOfRows < rows) { + if (pShowIter->pTrans == NULL) { + pShowIter->pIter = sdbFetch(pSdb, SDB_TRANS, pShowIter->pIter, (void **)&(pShowIter->pTrans)); + mDebug("retrieve trans detail from fetch, pShow->pIter:%p, pTrans:%p", pShowIter->pIter, pShowIter->pTrans); + if (pShowIter->pIter == NULL) break; + mInfo("retrieve trans detail from fetch, id:%d, trans stage:%d, IterNum:%d", pShowIter->pTrans->id, + pShowIter->pTrans->stage, pShowIter->num); + + SArray *pActions = mndTransGetAction(pShowIter->pTrans, pShowIter->pTrans->stage); + + mndTransShowActions(pSdb, pShowIter, pShow, pBlock, rows, &numOfRows, pActions, taosArrayGetSize(pActions), 0); + break; + } else { + mInfo("retrieve trans detail from iter, id:%d, iterStage:%d, IterNum:%d", pShowIter->pTrans->id, pShowIter->stage, + pShowIter->num); + SArray *pActions = mndTransGetAction(pShowIter->pTrans, pShowIter->stage); + + mndTransShowActions(pSdb, pShowIter, pShow, pBlock, rows, &numOfRows, pActions, + taosArrayGetSize(pActions) - pShowIter->num, pShowIter->num); + break; + } + } + +_OVER: + pShow->numOfRows += numOfRows; + + if (code != 0) { + mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code)); + } else { + mInfo("retrieve trans detail, numOfRows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows) + } + if (numOfRows == 0) { + taosMemoryFree(pShow->pIter); + pShow->pIter = NULL; + } + return numOfRows; +} + static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetchByType(pSdb, pIter, SDB_TRANS); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 389b04b1d6..122d426e9f 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2766,12 +2766,14 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb mndTransSetSerial(pTrans); - if (pNewVgroup->replica == 1 && pNewDb->cfg.replications == 3) { + if (pNewDb->cfg.replications == 3) { mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId); // add second - TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); + if (pNewVgroup->replica == 1){ + TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); + } // learner stage pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; @@ -2790,7 +2792,9 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); // add third - TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); + if (pNewVgroup->replica == 2){ + TAOS_CHECK_RETURN (mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray)); + } pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER; pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER; @@ -2802,7 +2806,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2])); TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); - } else if (pNewVgroup->replica == 3 && pNewDb->cfg.replications == 1) { + } else if (pNewDb->cfg.replications == 1) { mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId); @@ -2819,9 +2823,9 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2)); TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true)); TAOS_CHECK_RETURN( - mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId)); + mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId)); TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup)); - } else if (pNewVgroup->replica == 1 && pNewDb->cfg.replications == 2) { + } else if (pNewDb->cfg.replications == 2) { mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 13ae220116..175379d8df 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2543,7 +2543,8 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) if (pInfo->showRewrite) { getDBNameFromCondition(pInfo->pCondition, dbName); if (strncasecmp(name, TSDB_INS_TABLE_COMPACTS, TSDB_TABLE_FNAME_LEN) != 0 && - strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, TSDB_TABLE_FNAME_LEN) != 0) { + strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, TSDB_TABLE_FNAME_LEN) != 0 && + strncasecmp(name, TSDB_INS_TABLE_TRANSACTION_DETAILS, TSDB_TABLE_FNAME_LEN) != 0) { TAOS_UNUSED(tsnprintf(pInfo->req.db, sizeof(pInfo->req.db), "%d.%s", pInfo->accountId, dbName)); } } else if (strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 6d4d89607f..c2290b9eac 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -292,6 +292,8 @@ const char* nodesNodeName(ENodeType type) { return "ShowCompactsStmt"; case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT: return "ShowCompactDetailsStmt"; + case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT: + return "ShowTransactionDetailsStmt"; case QUERY_NODE_SHOW_GRANTS_FULL_STMT: return "ShowGrantsFullStmt"; case QUERY_NODE_SHOW_GRANTS_LOGS_STMT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7beaeaa46c..5bfc35aa9c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -714,6 +714,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) { break; case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT: code = makeNode(type, sizeof(SShowCompactDetailsStmt), &pNode); + break; + case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT: + code = makeNode(type, sizeof(SShowTransactionDetailsStmt), &pNode); break; case QUERY_NODE_KILL_QUERY_STMT: code = makeNode(type, sizeof(SKillQueryStmt), &pNode); @@ -1562,6 +1565,11 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pStmt->pCompactId); break; } + case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT: { + SShowTransactionDetailsStmt* pStmt = (SShowTransactionDetailsStmt*)pNode; + nodesDestroyNode(pStmt->pTransactionId); + break; + } case QUERY_NODE_SHOW_CREATE_DATABASE_STMT: taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg); break; diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index e69a3da4a9..8b2a98baf2 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -320,6 +320,7 @@ SNode* createCreateViewStmt(SAstCreateContext* pCxt, bool orReplace, SNode* pVie SNode* createDropViewStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pView); SNode* createShowCompactDetailsStmt(SAstCreateContext* pCxt, SNode* pCompactIdNode); SNode* createShowCompactsStmt(SAstCreateContext* pCxt, ENodeType type); +SNode* createShowTransactionDetailsStmt(SAstCreateContext* pCxt, SNode* pTransactionIdNode); SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* tsmaName, SNode* pOptions, SNode* pRealTable, SNode* pInterval); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 63eb09d509..38e93a7ce3 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -594,6 +594,7 @@ cmd ::= SHOW BNODES. cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT); } cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT); } cmd ::= SHOW TRANSACTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TRANSACTIONS_STMT); } +cmd ::= SHOW TRANSACTION NK_INTEGER(A). { pCxt->pRootNode = createShowTransactionDetailsStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A)); } cmd ::= SHOW TABLE DISTRIBUTED full_table_name(A). { pCxt->pRootNode = createShowTableDistributedStmt(pCxt, A); } cmd ::= SHOW CONSUMERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONSUMERS_STMT); } cmd ::= SHOW SUBSCRIPTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index a13472620b..4fc83c7c41 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -2931,6 +2931,18 @@ _err: return NULL; } +SNode* createShowTransactionDetailsStmt(SAstCreateContext* pCxt, SNode* pTransactionIdNode) { + CHECK_PARSER_STATUS(pCxt); + SShowTransactionDetailsStmt* pStmt = NULL; + pCxt->errCode = nodesMakeNode(QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT, (SNode**)&pStmt); + CHECK_MAKE_NODE(pStmt); + pStmt->pTransactionId = pTransactionIdNode; + return (SNode*)pStmt; +_err: + nodesDestroyNode(pTransactionIdNode); + return NULL; +} + static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange) { int32_t code = TSDB_CODE_SUCCESS; char* ipCopy = taosStrdup(ipRange); diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 1687916cb0..e876575f48 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -770,6 +770,12 @@ static int32_t collectMetaKeyFromShowCompactDetails(SCollectMetaKeyCxt* pCxt, SS return code; } +static int32_t collectMetaKeyFromShowTransactionDetails(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { + int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, + TSDB_INS_TABLE_TRANSACTION_DETAILS, pCxt->pMetaCache); + return code; +} + static int32_t collectMetaKeyFromShowGrantsFull(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_GRANTS_FULL, pCxt->pMetaCache); @@ -1094,6 +1100,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowCompacts(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT: return collectMetaKeyFromShowCompactDetails(pCxt, (SShowStmt*)pStmt); + case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT: + return collectMetaKeyFromShowTransactionDetails(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_GRANTS_FULL_STMT: return collectMetaKeyFromShowGrantsFull(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_SHOW_GRANTS_LOGS_STMT: diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f191080512..f20755ad93 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -320,6 +320,12 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = { .numOfShowCols = 1, .pShowCols = {"*"} }, + { .showType = QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT, + .pDbName = TSDB_INFORMATION_SCHEMA_DB, + .pTableName = TSDB_INS_TABLE_TRANSACTION_DETAILS, + .numOfShowCols = 1, + .pShowCols = {"*"} + }, { .showType = QUERY_NODE_SHOW_GRANTS_FULL_STMT, .pDbName = TSDB_INFORMATION_SCHEMA_DB, .pTableName = TSDB_INS_TABLE_GRANTS_FULL, @@ -16381,6 +16387,24 @@ static int32_t rewriteShowCompactDetailsStmt(STranslateContext* pCxt, SQuery* pQ return code; } +static int32_t rewriteShowTransactionDetailsStmt(STranslateContext* pCxt, SQuery* pQuery) { + SShowTransactionDetailsStmt* pShow = (SShowTransactionDetailsStmt*)(pQuery->pRoot); + SSelectStmt* pStmt = NULL; + int32_t code = createSelectStmtForShow(QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT, &pStmt); + if (TSDB_CODE_SUCCESS == code) { + if (NULL != pShow->pTransactionId) { + code = createOperatorNode(OP_TYPE_EQUAL, "transaction_id", pShow->pTransactionId, &pStmt->pWhere); + } + } + if (TSDB_CODE_SUCCESS == code) { + pCxt->showRewrite = true; + pQuery->showRewrite = true; + nodesDestroyNode(pQuery->pRoot); + pQuery->pRoot = (SNode*)pStmt; + } + return code; +} + static int32_t createParWhenThenNode(SNode* pWhen, SNode* pThen, SNode** ppResWhenThen) { SWhenThenNode* pWThen = NULL; int32_t code = nodesMakeNode(QUERY_NODE_WHEN_THEN, (SNode**)&pWThen); @@ -16956,6 +16980,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT: code = rewriteShowCompactDetailsStmt(pCxt, pQuery); break; + case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT: + code = rewriteShowTransactionDetailsStmt(pCxt, pQuery); + break; case QUERY_NODE_SHOW_DB_ALIVE_STMT: case QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT: code = rewriteShowAliveStmt(pCxt, pQuery); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 195cb21618..c9c0b7a971 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -326,6 +326,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While ex TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CTX_SWITCH, "Wrong transaction execution context") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT_COMPACT, "Transaction not completed due to conflict with compact") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED, "The transaction is not able to be killed") // mnode-mq TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4aedc0991e..01d7db54ae 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -398,11 +398,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/empty_identifier.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_transaction_detail.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/persisit_config.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/qmemCtrl.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact_vgroups.py - - ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_delete.py diff --git a/tests/system-test/0-others/kill_balance_leader.py b/tests/system-test/0-others/kill_balance_leader.py new file mode 100644 index 0000000000..be86336661 --- /dev/null +++ b/tests/system-test/0-others/kill_balance_leader.py @@ -0,0 +1,64 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +from util.log import * +from util.cases import * +from util.dnodes import * +from util.sql import * + + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to init {__file__}") + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), logSql) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + def run(self): + tdLog.debug(f"start to excute {__file__}") + + tdSql.execute('CREATE DATABASE db vgroups 160 replica 3;') + + tdSql.execute('balance vgroup leader') + + sql ="show transactions;" + rows = tdSql.query(sql) + + if rows > 0: + tranId = tdSql.getData(0, 0) + tdLog.info('kill transaction %d'%tranId) + tdSql.execute('kill transaction %d'%tranId, queryTimes=1 ) + + if self.waitTransactionZero() is False: + tdLog.exit(f"{sql} transaction not finished") + return False + + def waitTransactionZero(self, seconds = 300, interval = 1): + # wait end + for i in range(seconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + tdLog.info("transaction count became zero.") + return True + #tdLog.info(f"i={i} wait ...") + time.sleep(interval) + + return False + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/0-others/show_transaction_detail.py b/tests/system-test/0-others/show_transaction_detail.py new file mode 100644 index 0000000000..7b61b29ce7 --- /dev/null +++ b/tests/system-test/0-others/show_transaction_detail.py @@ -0,0 +1,105 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +from util.log import * +from util.cases import * +from util.dnodes import * +from util.sql import * +from util.cluster import * +import threading + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to init {__file__}") + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), logSql) + self.dnodes = cluster.dnodes + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + def run(self): + tdLog.debug(f"start to excute {__file__}") + + tdLog.info("CREATE DATABASE db1 vgroups 16 replica 1;") + tdSql.execute('CREATE DATABASE db1 vgroups 16 replica 1;') + + if self.waitTransactionZero() is False: + tdLog.exit(f"{sql} transaction not finished") + return False + + newTdSql1=tdCom.newTdSql() + t1 = threading.Thread(target=self.alterDbThread, args=('', newTdSql1)) + + newTdSql2=tdCom.newTdSql() + t2 = threading.Thread(target=self.createDbThread, args=('', newTdSql2)) + + t1.start() + t2.start() + + dnode = self.dnodes[1] + + # stop dnode + tdLog.info(f"stop dnode 1") + dnode.stoptaosd() + + rows = tdSql.query("show transactions;") + if rows > 0: + tranId = tdSql.getData(0, 0) + tdLog.info(f"show transaction {tranId}") + rows = tdSql.query(f"show transaction {tranId}", queryTimes=1) + + if rows != 160: + tdLog.exit(f"restore transaction detial error, rows={rows}") + return False + + rows = tdSql.query("show transactions;") + if rows > 0: + tranId = tdSql.getData(1, 0) + tdLog.info(f"show transaction {tranId}") + rows = tdSql.query(f"show transaction {tranId}", queryTimes=1) + + if rows != 176: + tdLog.exit(f"restore transaction detial error, rows={rows}") + return False + + tdLog.info(f"select * from ins_transaction_details") + rows = tdSql.query(f"select * from information_schema.ins_transaction_details", queryTimes=1) + + if rows != 336: + tdLog.exit(f"restore transaction detial error, rows={rows}") + return False + + def createDbThread(self, sql, newTdSql): + tdLog.info("CREATE DATABASE db2 vgroups 160 replica 1;") + newTdSql.execute('CREATE DATABASE db2 vgroups 160 replica 1;') + + def alterDbThread(self, sql, newTdSql): + tdLog.info("alter DATABASE db1 replica 3;") + newTdSql.execute('alter DATABASE db1 replica 3;') + + def waitTransactionZero(self, seconds = 300, interval = 1): + # wait end + for i in range(seconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + tdLog.info("transaction count became zero.") + return True + #tdLog.info(f"i={i} wait ...") + time.sleep(interval) + + return False + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/3-enterprise/restore/kill_restore_dnode.py b/tests/system-test/3-enterprise/restore/kill_restore_dnode.py new file mode 100644 index 0000000000..ed1c6c21fc --- /dev/null +++ b/tests/system-test/3-enterprise/restore/kill_restore_dnode.py @@ -0,0 +1,90 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +sys.path.append("./3-enterprise/restore") +from restoreBasic import * +from util.common import tdCom +import threading + + +class TDTestCase: + # init + def init(self, conn, logSql, replicaVar=1): + tdLog.debug("start to execute %s" % __file__) + self.basic = RestoreBasic() + self.basic.init(conn, logSql, replicaVar) + + # run + def run(self): + self.basic.restore_dnode_prepare(2) + + self.execute() + + def execute(self): + newTdSql=tdCom.newTdSql() + t0 = threading.Thread(target=self.restoreDnodeThread, args=('', newTdSql)) + t0.start() + + time.sleep(2) + sql ="show transactions;" + tdLog.info(sql) + rows = tdSql.query(sql) + + if rows > 0: + self.basic.stop_dnode(2) + + tranId = tdSql.getData(0, 0) + + tdLog.info('show transaction %d'%tranId) + rows=tdSql.query('show transaction %d'%tranId, queryTimes=1) + if rows != 13: + tdLog.exit(f"restore transaction detial error, rows={rows}") + return False + + tdLog.info('kill transaction %d'%tranId) + tdSql.execute('kill transaction %d'%tranId, queryTimes=1 ) + + time.sleep(3) + sql ="show transactions;" + tdLog.info(sql) + rows = tdSql.query(sql) + if rows > 0: + tdLog.info(f"{sql} transaction not finished") + return False + + self.basic.restore_dnode_exec(2) + else: + tdLog.exit(f"{sql} no transaction exist") + return False + + def restoreDnodeThread(self, p, newTdSql): + sleep(1) + + sql = f"restore dnode 2" + tdLog.info(sql) + newTdSql.error(sql, expectErrInfo="Wrong transaction execution context") + tdLog.info(f"{sql} finished") + + # stop + def stop(self): + self.basic.stop() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/3-enterprise/restore/restoreBasic.py b/tests/system-test/3-enterprise/restore/restoreBasic.py index 77fa606b9c..74cf572018 100644 --- a/tests/system-test/3-enterprise/restore/restoreBasic.py +++ b/tests/system-test/3-enterprise/restore/restoreBasic.py @@ -143,6 +143,34 @@ class RestoreBasic: tdSql.execute(sql) self.check_corrent() + def restore_dnode_prepare(self, index): + tdLog.info(f"start restore dnode {index}") + dnode = self.dnodes[index - 1] + + # stop dnode + tdLog.info(f"stop dnode {index}") + dnode.stoptaosd() + + # remove dnode folder + try: + shutil.rmtree(dnode.dataDir) + tdLog.info(f"delete dir {dnode.dataDir} successful") + except OSError as x: + tdLog.exit(f"remove path {dnode.dataDir} error : {x.strerror}") + + dnode.starttaosd() + + def restore_dnode_exec(self, index): + # exec restore + sql = f"restore dnode {index}" + tdLog.info(sql) + tdSql.execute(sql) + self.check_corrent() + + def stop_dnode(self, index): + dnode = self.dnodes[index - 1] + + dnode.starttaosd() # restore vnode def restore_vnode(self, index): tdLog.info(f"start restore vnode on dnode {index}")