Merge pull request #29148 from taosdata/dmchen/trans-improve

merge change
This commit is contained in:
dongming chen 2024-12-16 10:34:50 +08:00 committed by GitHub
commit 7fee0a076a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 845 additions and 31 deletions

View File

@ -63,6 +63,7 @@ extern "C" {
#define TSDB_INS_TABLE_TSMAS "ins_tsmas"
#define TSDB_INS_DISK_USAGE "ins_disk_usage"
#define TSDB_INS_TABLE_FILESETS "ins_filesets"
#define TSDB_INS_TABLE_TRANSACTION_DETAILS "ins_transaction_details"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "perf_smas"

View File

@ -163,6 +163,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_ANODE_FULL,
TSDB_MGMT_TABLE_USAGE,
TSDB_MGMT_TABLE_FILESETS,
TSDB_MGMT_TABLE_TRANSACTION_DETAIL,
TSDB_MGMT_TABLE_MAX,
} EShowType;
@ -393,6 +394,7 @@ typedef enum ENodeType {
QUERY_NODE_SHOW_VIEWS_STMT,
QUERY_NODE_SHOW_COMPACTS_STMT,
QUERY_NODE_SHOW_COMPACT_DETAILS_STMT,
QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT,
QUERY_NODE_SHOW_GRANTS_FULL_STMT,
QUERY_NODE_SHOW_GRANTS_LOGS_STMT,
QUERY_NODE_SHOW_CLUSTER_MACHINES_STMT,

View File

@ -439,6 +439,11 @@ typedef struct SShowCompactDetailsStmt {
SNode* pCompactId;
} SShowCompactDetailsStmt;
typedef struct SShowTransactionDetailsStmt {
ENodeType type;
SNode* pTransactionId;
} SShowTransactionDetailsStmt;
typedef enum EIndexType { INDEX_TYPE_SMA = 1, INDEX_TYPE_FULLTEXT, INDEX_TYPE_NORMAL } EIndexType;
typedef struct SIndexOptions {

View File

@ -411,6 +411,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_MND_TRANS_CTX_SWITCH TAOS_DEF_ERROR_CODE(0, 0x03D8)
#define TSDB_CODE_MND_TRANS_CONFLICT_COMPACT TAOS_DEF_ERROR_CODE(0, 0x03D9)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
#define TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED TAOS_DEF_ERROR_CODE(0, 0x03D2)
// mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)

View File

@ -329,7 +329,11 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 512
#define TSDB_TRANS_ERROR_LEN 512
#define TSDB_TRANS_OBJTYPE_LEN 40
#define TSDB_TRANS_RESULT_LEN 100
#define TSDB_TRANS_TARGET_LEN 300
#define TSDB_TRANS_DETAIL_LEN 100
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128

View File

@ -314,6 +314,8 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "oper", .bytes = TSDB_TRANS_OPER_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stable", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "killable", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "kill_mnode", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
@ -403,6 +405,15 @@ static const SSysDbTableSchema userCompactsDetailSchema[] = {
{.name = "remain_time(s)", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false},
};
static const SSysDbTableSchema userTransactionDetailSchema[] = {
{.name = "transaction_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "action", .bytes = 30 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "obj_type", .bytes = TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "result", .bytes = TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "target", .bytes = TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "detail", .bytes = TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema anodesSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "url", .bytes = TSDB_ANALYTIC_ANODE_URL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
@ -521,6 +532,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_ANODES_FULL, anodesFullSchema, tListLen(anodesFullSchema), true},
{TSDB_INS_DISK_USAGE, diskUsageSchema, tListLen(diskUsageSchema), false},
{TSDB_INS_TABLE_FILESETS, filesetsFullSchema, tListLen(filesetsFullSchema), false},
{TSDB_INS_TABLE_TRANSACTION_DETAILS, userTransactionDetailSchema, tListLen(userTransactionDetailSchema), false},
};
static const SSysDbTableSchema connectionsSchema[] = {

View File

@ -703,6 +703,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
if (tDeserializeSRetrieveTableReq(pMsg->pCont, pMsg->contLen, &retrieveReq) != 0) {
return TSDB_CODE_INVALID_MSG;
}
dInfo("retrieve table:%s, user:%s, compactId:%" PRId64, retrieveReq.tb, retrieveReq.user, retrieveReq.compactId);
#if 0
if (strcmp(retrieveReq.user, TSDB_DEFAULT_USER) != 0) {
code = TSDB_CODE_MND_NO_RIGHTS;

View File

@ -133,6 +133,12 @@ typedef enum {
TRN_EXEC_SERIAL = 1,
} ETrnExec;
typedef enum {
TRN_KILL_MODE_SKIP = 0,
TRN_KILL_MODE_INTERUPT = 1,
//TRN_KILL_MODE_ROLLBACK = 2,
} ETrnKillMode;
typedef enum {
DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT,
@ -201,6 +207,8 @@ typedef struct {
SRWLatch lockRpcArray;
int64_t mTraceId;
TdThreadMutex mutex;
bool ableToBeKilled;
ETrnKillMode killMode;
} STrans;
typedef struct {

View File

@ -54,6 +54,8 @@ typedef struct {
SSdbRaw *pRaw;
int64_t mTraceId;
int64_t startTime;
int64_t endTime;
} STransAction;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
@ -80,6 +82,8 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, voi
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId);
void mndTransSetSerial(STrans *pTrans);
void mndTransSetBeKilled(STrans *pTrans, bool ableToBeKilled);
void mndTransSetKillMode(STrans *pTrans, ETrnKillMode killMode);
void mndTransSetParallel(STrans *pTrans);
void mndTransSetChangeless(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper);

View File

@ -45,6 +45,8 @@ int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
char *sep = NULL;
SDbObj *pDb = NULL;
mInfo("retrieve compact detail");
if (strlen(pShow->db) > 0) {
sep = strchr(pShow->db, '.');
if (sep &&

View File

@ -1017,7 +1017,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER);
TAOS_CHECK_GOTO(mndCreateDb(pMnode, pReq, &createReq, pUser, dnodeList), &lino, _OVER);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
//if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
if (tNameFromString(&name, createReq.db, T_NAME_ACCT | T_NAME_DB) < 0)
@ -1286,6 +1286,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
TAOS_RETURN(code);
}
mInfo("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
mInfo("trans:%d, used to alter db, ableToBeKilled:%d, killMode:%d", pTrans->id, pTrans->ableToBeKilled, pTrans->killMode);
mndTransSetDbName(pTrans, pOld->name, NULL);
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
@ -1294,6 +1295,8 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew), NULL, _OVER);
mInfo("trans:%d, used to alter db, ableToBeKilled:%d, killMode:%d", pTrans->id, pTrans->ableToBeKilled, pTrans->killMode);
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
code = 0;

View File

@ -132,6 +132,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_COMPACT;
} else if (strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, len) == 0) {
type = TSDB_MGMT_TABLE_COMPACT_DETAIL;
} else if (strncasecmp(name, TSDB_INS_TABLE_TRANSACTION_DETAILS, len) == 0) {
type = TSDB_MGMT_TABLE_TRANSACTION_DETAIL;
} else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_FULL, len) == 0) {
type = TSDB_MGMT_TABLE_GRANTS_FULL;
} else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_LOGS, len) == 0) {
@ -236,7 +238,8 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
SRetrieveTableReq retrieveReq = {0};
TAOS_CHECK_RETURN(tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq));
mDebug("process to retrieve systable req db:%s, tb:%s", retrieveReq.db, retrieveReq.tb);
mDebug("process to retrieve systable req db:%s, tb:%s, compactId:%" PRId64, retrieveReq.db, retrieveReq.tb,
retrieveReq.compactId);
if (retrieveReq.showId == 0) {
STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));

View File

@ -22,9 +22,11 @@
#include "mndSync.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#define TRANS_VER1_NUMBER 1
#define TRANS_VER2_NUMBER 2
#define TRANS_VER3_NUMBER 3
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 44
@ -70,7 +72,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq);
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t tsMaxTransId = 0;
int32_t mndInitTrans(SMnode *pMnode) {
@ -89,6 +91,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndCancelGetNextTrans);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANSACTION_DETAIL, mndRetrieveTransDetail);
return sdbSetTable(pMnode->pSdb, table);
}
@ -156,7 +159,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
int32_t code = 0;
int32_t lino = 0;
terrno = TSDB_CODE_INVALID_MSG;
int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER;
int8_t sver = TRANS_VER3_NUMBER;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
rawDataLen += mndTransGetActionsSize(pTrans->prepareActions);
@ -220,6 +223,11 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
pIter = taosHashIterate(pTrans->arbGroupIds, pIter);
}
if (sver > TRANS_VER1_NUMBER) {
SDB_SET_INT8(pRaw, dataPos, pTrans->ableToBeKilled, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->killMode, _OVER)
}
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
@ -310,7 +318,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TRANS_VER1_NUMBER && sver != TRANS_VER2_NUMBER) {
if (sver > TRANS_VER3_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
@ -389,6 +397,15 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
if ((terrno = taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0)) != 0) goto _OVER;
}
if (sver > TRANS_VER2_NUMBER) {
int8_t ableKill = 0;
int8_t killMode = 0;
SDB_GET_INT8(pRaw, dataPos, &ableKill, _OVER)
SDB_GET_INT8(pRaw, dataPos, &killMode, _OVER)
pTrans->ableToBeKilled = ableKill;
pTrans->killMode = killMode;
}
SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
terrno = 0;
@ -430,12 +447,25 @@ static const char *mndTransStr(ETrnStage stage) {
}
}
static const char *mndTransTypeStr(ETrnAct actionType) {
switch (actionType) {
case TRANS_ACTION_MSG:
return "msg";
case TRANS_ACTION_RAW:
return "sdb";
default:
return "invalid";
}
}
static void mndSetTransLastAction(STrans *pTrans, STransAction *pAction) {
if (pAction != NULL) {
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo = pAction->errCode;
if (pAction->errCode != TSDB_CODE_ACTION_IN_PROGRESS) {
pTrans->lastAction = pAction->id;
pTrans->lastMsgType = pAction->msgType;
pTrans->lastEpset = pAction->epSet;
pTrans->lastErrorNo = pAction->errCode;
}
} else {
pTrans->lastAction = 0;
pTrans->lastMsgType = 0;
@ -636,6 +666,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->policy = policy;
pTrans->conflict = conflict;
pTrans->exec = TRN_EXEC_PARALLEL;
pTrans->ableToBeKilled = false;
pTrans->createdTime = taosGetTimestampMs();
pTrans->prepareActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
@ -804,6 +835,13 @@ void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) {
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
void mndTransSetBeKilled(STrans *pTrans, bool ableToBeKilled) { pTrans->ableToBeKilled = ableToBeKilled; }
void mndTransSetKillMode(STrans *pTrans, ETrnKillMode killMode) {
pTrans->ableToBeKilled = true;
pTrans->killMode = killMode;
}
void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; }
@ -1043,6 +1081,39 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return TSDB_CODE_INVALID_PARA;
}
mInfo("trans:%d, action list:", pTrans->id);
int32_t index = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTrans->prepareActions); ++i, ++index) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, i);
mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index,
mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
}
for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i, ++index) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
mInfo("trans:%d, action:%d, %s:%d msgType:%s", pTrans->id, index,
mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));;
}
for (int32_t i = 0; i < taosArrayGetSize(pTrans->commitActions); ++i, ++index) {
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index,
mndTransStr(pAction->stage), i, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
}
for (int32_t i = 0; i < taosArrayGetSize(pTrans->undoActions); ++i, ++index) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
if(pAction->actionType == TRANS_ACTION_MSG){
mInfo("trans:%d, action:%d, %s:%d msgType:%s", pTrans->id, index,
mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));;
}
else{
mInfo("trans:%d, action:%d, %s:%d sdbType:%s, sdbStatus:%s", pTrans->id, index,
mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status));
}
}
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
TAOS_CHECK_RETURN(mndTransCheckParallelActions(pMnode, pTrans));
@ -1260,7 +1331,9 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
if (pAction != NULL) {
pAction->msgReceived = 1;
pAction->errCode = pRsp->code;
pTrans->lastErrorNo = pRsp->code;
pAction->endTime = taosGetTimestampMs();
// pTrans->lastErrorNo = pRsp->code;
mndSetTransLastAction(pTrans, pAction);
mInfo("trans:%d, %s:%d response is received, received code:0x%x(%s), accept:0x%x(%s) retry:0x%x(%s)", transId,
mndTransStr(pAction->stage), action, pRsp->code, tstrerror(pRsp->code), pAction->acceptableCode,
@ -1374,6 +1447,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
pAction->msgSent = 1;
// pAction->msgReceived = 0;
pAction->errCode = TSDB_CODE_ACTION_IN_PROGRESS;
pAction->startTime = taosGetTimestampMs();
pAction->endTime = 0;
mInfo("trans:%d, %s:%d is sent, %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, detail);
mndSetTransLastAction(pTrans, pAction);
@ -1527,8 +1602,9 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pActions, action);
mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d", pTrans->id, pTrans->actionPos,
mndTransStr(pAction->stage), pAction->actionType);
mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d, msgSent:%d, msgReceived:%d",
pTrans->id, pTrans->actionPos, mndTransStr(pAction->stage), pAction->actionType, pAction->msgSent,
pAction->msgReceived);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code == 0) {
@ -1924,13 +2000,25 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
TAOS_RETURN(TSDB_CODE_MND_TRANS_INVALID_STAGE);
}
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
pAction->msgSent = 1;
pAction->msgReceived = 1;
pAction->errCode = 0;
if(pTrans->ableToBeKilled == false){
return TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED;
}
if(pTrans->killMode == TRN_KILL_MODE_SKIP){
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
pAction->msgSent = 1;
pAction->msgReceived = 1;
pAction->errCode = 0;
}
}
else if(pTrans->killMode == TRN_KILL_MODE_INTERUPT){
pTrans->stage = TRN_STAGE_PRE_FINISH;
}
else{
return TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED;
}
mndTransExecute(pMnode, pTrans);
@ -2002,6 +2090,85 @@ void mndTransPullup(SMnode *pMnode) {
taosArrayDestroy(pArray);
}
static void mndTransLogAction(STrans *pTrans) {
char detail[512] = {0};
int32_t len = 0;
int32_t index = 0;
if (pTrans->stage == TRN_STAGE_PREPARE) {
for (int32_t i = 0; i < taosArrayGetSize(pTrans->prepareActions); ++i, ++index) {
len = 0;
STransAction *pAction = taosArrayGet(pTrans->prepareActions, i);
len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index,
mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type),
sdbStatusName(pAction->pRaw->status));
mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
}
}
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
for (int32_t i = 0; i < taosArrayGetSize(pTrans->redoActions); ++i, ++index) {
len = 0;
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
if (pAction->actionType == TRANS_ACTION_MSG) {
char bufStart[40] = {0};
taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI);
char endStart[40] = {0};
taosFormatUtcTime(endStart, sizeof(endStart), pAction->endTime, TSDB_TIME_PRECISION_MILLI);
len += snprintf(detail + len, sizeof(detail) - len,
"action:%d, %s:%d msgType:%s,"
"sent:%d, received:%d, startTime:%s, endTime:%s, ",
index, mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType), pAction->msgSent,
pAction->msgReceived, bufStart, endStart);
SEpSet epset = pAction->epSet;
if (epset.numOfEps > 0) {
len += snprintf(detail + len, sizeof(detail) - len, "numOfEps:%d inUse:%d ", epset.numOfEps, epset.inUse);
for (int32_t i = 0; i < epset.numOfEps; ++i) {
len +=
snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
}
}
len += snprintf(detail + len, sizeof(detail) - len, ", errCode:0x%x(%s)\n", pAction->errCode & 0xFFFF,
tstrerror(pAction->errCode));
} else {
len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s, written:%d\n",
index, mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type),
sdbStatusName(pAction->pRaw->status), pAction->rawWritten);
}
mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
}
}
if (pTrans->stage == TRN_STAGE_COMMIT_ACTION) {
for (int32_t i = 0; i < taosArrayGetSize(pTrans->commitActions); ++i, ++index) {
len = 0;
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index,
mndTransStr(pAction->stage), i, sdbTableName(pAction->pRaw->type),
sdbStatusName(pAction->pRaw->status));
mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
}
for (int32_t i = 0; i < taosArrayGetSize(pTrans->undoActions); ++i, ++index) {
len = 0;
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
if (pAction->actionType == TRANS_ACTION_MSG) {
len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d msgType:%s\n", index,
mndTransStr(pAction->stage), pAction->id, TMSG_INFO(pAction->msgType));
;
} else {
len += snprintf(detail + len, sizeof(detail) - len, "action:%d, %s:%d sdbType:%s, sdbStatus:%s\n", index,
mndTransStr(pAction->stage), pAction->id, sdbTableName(pAction->pRaw->type),
sdbStatusName(pAction->pRaw->status));
}
mDebug("trans:%d, show tran action, detail:%s", pTrans->id, detail);
}
}
}
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2044,6 +2211,18 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)stbname, false), pTrans, &lino, _OVER);
const char *killableStr = pTrans->ableToBeKilled ? "yes" : "no";
char killableVstr[10 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(killableVstr, killableStr, 24);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)killableVstr, false);
const char *killModeStr = pTrans->killMode == TRN_KILL_MODE_SKIP ? "skip" : "interrupt";
char killModeVstr[10 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(killModeVstr, killModeStr, 24);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)killModeVstr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false), pTrans, &lino,
_OVER);
@ -2052,7 +2231,6 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false), pTrans, &lino,
_OVER);
char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
char detail[TSDB_TRANS_ERROR_LEN + 1] = {0};
int32_t len = tsnprintf(detail, sizeof(detail), "action:%d code:0x%x(%s) ", pTrans->lastAction,
pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo));
@ -2061,13 +2239,17 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
len += tsnprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ",
TMSG_INFO(pTrans->lastMsgType), epset.numOfEps, epset.inUse);
for (int32_t i = 0; i < pTrans->lastEpset.numOfEps; ++i) {
len += tsnprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u \n", i, epset.eps[i].fqdn, epset.eps[i].port);
}
}
char lastInfo[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(lastInfo, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
RETRIEVE_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)lastInfo, false), pTrans, &lino, _OVER);
mndTransLogAction(pTrans);
numOfRows++;
sdbRelease(pSdb, pTrans);
}
@ -2078,6 +2260,241 @@ _OVER:
return numOfRows;
}
static int32_t mndShowTransCommonColumns(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction,
int32_t transactionId, int32_t curActionId, int32_t numOfRows, int32_t *cols) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)&transactionId, false), &lino, _OVER);
char action[30 + 1] = {0};
if (curActionId == pAction->id) {
len += snprintf(action + len, sizeof(action) - len, "%s:%d(%s)<-last", mndTransStr(pAction->stage), pAction->id,
mndTransTypeStr(pAction->actionType));
} else {
len += snprintf(action + len, sizeof(action) - len, "%s:%d(%s)", mndTransStr(pAction->stage), pAction->id,
mndTransTypeStr(pAction->actionType));
}
char actionVStr[30 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(actionVStr, action, pShow->pMeta->pSchemas[*cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, (*cols)++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)actionVStr, false), &lino, _OVER);
_OVER:
if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
return code;
}
static int32_t mndShowTransAction(SShowObj *pShow, SSDataBlock *pBlock, STransAction *pAction, int32_t transactionId,
int32_t curActionId, int32_t rows, int32_t numOfRows) {
int32_t code = 0;
int32_t lino = 0;
int32_t len = 0;
int32_t cols = 0;
cols = 0;
mndShowTransCommonColumns(pShow, pBlock, pAction, transactionId, curActionId, numOfRows, &cols);
if (pAction->actionType == TRANS_ACTION_MSG) {
int32_t len = 0;
char objType[TSDB_TRANS_OBJTYPE_LEN + 1] = {0};
len += snprintf(objType + len, sizeof(objType) - len, "%s(s:%d,r:%d)", TMSG_INFO(pAction->msgType),
pAction->msgSent, pAction->msgReceived);
char objTypeVStr[TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER);
char result[TSDB_TRANS_RESULT_LEN + 1] = {0};
len = 0;
len += snprintf(result + len, sizeof(result) - len, "errCode:0x%x(%s)", pAction->errCode & 0xFFFF,
tstrerror(pAction->errCode));
char resultVStr[TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER);
char target[TSDB_TRANS_TARGET_LEN] = {0};
len = 0;
SEpSet epset = pAction->epSet;
if (epset.numOfEps > 0) {
for (int32_t i = 0; i < epset.numOfEps; ++i) {
len += snprintf(target + len, sizeof(target) - len, "ep:%d-%s:%u,", i, epset.eps[i].fqdn, epset.eps[i].port);
}
len += snprintf(target + len, sizeof(target) - len, "(%d:%d) ", epset.numOfEps, epset.inUse);
}
char targetVStr[TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetVStr, target, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)targetVStr, false), &lino, _OVER);
char detail[TSDB_TRANS_DETAIL_LEN] = {0};
len = 0;
char bufStart[40] = {0};
if (pAction->startTime > 0)
taosFormatUtcTime(bufStart, sizeof(bufStart), pAction->startTime, TSDB_TIME_PRECISION_MILLI);
char bufEnd[40] = {0};
if (pAction->endTime > 0) taosFormatUtcTime(bufEnd, sizeof(bufEnd), pAction->endTime, TSDB_TIME_PRECISION_MILLI);
len += snprintf(detail + len, sizeof(detail) - len, "startTime:%s, endTime:%s, ", bufStart, bufEnd);
char detailVStr[TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER);
} else {
int32_t len = 0;
char objType[TSDB_TRANS_OBJTYPE_LEN + 1] = {0};
if (pAction->pRaw->type == SDB_VGROUP) {
SSdbRow *pRow = mndVgroupActionDecode(pAction->pRaw);
SVgObj *pVgroup = sdbGetRowObj(pRow);
len += snprintf(objType + len, sizeof(objType) - len, "%s(%d)", sdbTableName(pAction->pRaw->type), pVgroup->vgId);
taosMemoryFreeClear(pRow);
} else {
strcpy(objType, sdbTableName(pAction->pRaw->type));
}
char objTypeVStr[TSDB_TRANS_OBJTYPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(objTypeVStr, objType, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)objTypeVStr, false), &lino, _OVER);
char result[TSDB_TRANS_RESULT_LEN + 1] = {0};
len = 0;
len += snprintf(result + len, sizeof(result) - len, "rawWritten:%d", pAction->rawWritten);
char resultVStr[TSDB_TRANS_RESULT_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(resultVStr, result, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)resultVStr, false), &lino, _OVER);
char target[TSDB_TRANS_TARGET_LEN] = "";
char targetVStr[TSDB_TRANS_TARGET_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetVStr, target, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)targetVStr, false), &lino, _OVER);
char detail[TSDB_TRANS_DETAIL_LEN] = {0};
len = 0;
len += snprintf(detail + len, sizeof(detail) - len, "sdbStatus:%s", sdbStatusName(pAction->pRaw->status));
char detailVStr[TSDB_TRANS_DETAIL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(detailVStr, detail, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, (const char *)detailVStr, false), &lino, _OVER);
}
_OVER:
if (code != 0) mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
return code;
}
static SArray *mndTransGetAction(STrans *pTrans, ETrnStage stage) {
if (stage == TRN_STAGE_PREPARE) {
return pTrans->prepareActions;
}
if (stage == TRN_STAGE_REDO_ACTION) {
return pTrans->redoActions;
}
if (stage == TRN_STAGE_COMMIT_ACTION) {
return pTrans->commitActions;
}
if (stage == TRN_STAGE_UNDO_ACTION) {
return pTrans->undoActions;
}
return NULL;
}
typedef struct STransDetailIter {
void *pIter;
STrans *pTrans;
ETrnStage stage;
int32_t num;
} STransDetailIter;
static void mndTransShowActions(SSdb *pSdb, STransDetailIter *pShowIter, SShowObj *pShow, SSDataBlock *pBlock,
int32_t rows, int32_t *numOfRows, SArray *pActions, int32_t end, int32_t start) {
int32_t actionNum = taosArrayGetSize(pActions);
mInfo("stage:%s, Actions num:%d", mndTransStr(pShowIter->stage), actionNum);
for (int32_t i = start; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pShowIter->pTrans->redoActions, i);
mndShowTransAction(pShow, pBlock, pAction, pShowIter->pTrans->id, pShowIter->pTrans->lastAction, rows, *numOfRows);
(*numOfRows)++;
if (*numOfRows >= rows) break;
}
if (*numOfRows == end) {
sdbRelease(pSdb, pShowIter->pTrans);
pShowIter->pTrans = NULL;
pShowIter->num = 0;
} else {
pShowIter->pTrans = pShowIter->pTrans;
pShowIter->stage = pShowIter->pTrans->stage;
pShowIter->num += (*numOfRows);
}
}
static int32_t mndRetrieveTransDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t code = 0;
int32_t lino = 0;
mInfo("start to mndRetrieveTransDetail, rows:%d, pShow->numOfRows:%d, pShow->pIter:%p", rows, pShow->numOfRows,
pShow->pIter);
if (pShow->pIter == NULL) {
pShow->pIter = taosMemoryMalloc(sizeof(STransDetailIter));
if (pShow->pIter == NULL) {
mError("failed to malloc for pShow->pIter");
return 0;
}
memset(pShow->pIter, 0, sizeof(STransDetailIter));
}
STransDetailIter *pShowIter = (STransDetailIter *)pShow->pIter;
while (numOfRows < rows) {
if (pShowIter->pTrans == NULL) {
pShowIter->pIter = sdbFetch(pSdb, SDB_TRANS, pShowIter->pIter, (void **)&(pShowIter->pTrans));
mDebug("retrieve trans detail from fetch, pShow->pIter:%p, pTrans:%p", pShowIter->pIter, pShowIter->pTrans);
if (pShowIter->pIter == NULL) break;
mInfo("retrieve trans detail from fetch, id:%d, trans stage:%d, IterNum:%d", pShowIter->pTrans->id,
pShowIter->pTrans->stage, pShowIter->num);
SArray *pActions = mndTransGetAction(pShowIter->pTrans, pShowIter->pTrans->stage);
mndTransShowActions(pSdb, pShowIter, pShow, pBlock, rows, &numOfRows, pActions, taosArrayGetSize(pActions), 0);
break;
} else {
mInfo("retrieve trans detail from iter, id:%d, iterStage:%d, IterNum:%d", pShowIter->pTrans->id, pShowIter->stage,
pShowIter->num);
SArray *pActions = mndTransGetAction(pShowIter->pTrans, pShowIter->stage);
mndTransShowActions(pSdb, pShowIter, pShow, pBlock, rows, &numOfRows, pActions,
taosArrayGetSize(pActions) - pShowIter->num, pShowIter->num);
break;
}
}
_OVER:
pShow->numOfRows += numOfRows;
if (code != 0) {
mError("failed to retrieve at line:%d, since %s", lino, tstrerror(code));
} else {
mInfo("retrieve trans detail, numOfRows:%d, pShow->numOfRows:%d", numOfRows, pShow->numOfRows)
}
if (numOfRows == 0) {
taosMemoryFree(pShow->pIter);
pShow->pIter = NULL;
}
return numOfRows;
}
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetchByType(pSdb, pIter, SDB_TRANS);

View File

@ -2766,12 +2766,14 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
mndTransSetSerial(pTrans);
if (pNewVgroup->replica == 1 && pNewDb->cfg.replications == 3) {
if (pNewDb->cfg.replications == 3) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
// add second
TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
if (pNewVgroup->replica == 1){
TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
}
// learner stage
pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
@ -2790,7 +2792,9 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
// add third
TAOS_CHECK_RETURN(mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
if (pNewVgroup->replica == 2){
TAOS_CHECK_RETURN (mndAddVnodeToVgroup(pMnode, pTrans, pNewVgroup, pArray));
}
pNewVgroup->vnodeGid[0].nodeRole = TAOS_SYNC_ROLE_VOTER;
pNewVgroup->vnodeGid[1].nodeRole = TAOS_SYNC_ROLE_VOTER;
@ -2802,7 +2806,7 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
TAOS_CHECK_RETURN(mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &pNewVgroup->vnodeGid[2]));
TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
} else if (pNewVgroup->replica == 3 && pNewDb->cfg.replications == 1) {
} else if (pNewDb->cfg.replications == 1) {
mInfo("db:%s, vgId:%d, will remove 2 vnodes, vn:0 dnode:%d vn:1 dnode:%d vn:2 dnode:%d", pVgroup->dbName,
pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId, pVgroup->vnodeGid[1].dnodeId, pVgroup->vnodeGid[2].dnodeId);
@ -2819,9 +2823,9 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
TAOS_CHECK_RETURN(mndRemoveVnodeFromVgroup(pMnode, pTrans, pNewVgroup, pArray, &del2));
TAOS_CHECK_RETURN(mndAddDropVnodeAction(pMnode, pTrans, pNewDb, pNewVgroup, &del2, true));
TAOS_CHECK_RETURN(
mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
mndAddAlterVnodeReplicaAction(pMnode, pTrans, pNewDb, pNewVgroup, pNewVgroup->vnodeGid[0].dnodeId));
TAOS_CHECK_RETURN(mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, pNewVgroup));
} else if (pNewVgroup->replica == 1 && pNewDb->cfg.replications == 2) {
} else if (pNewDb->cfg.replications == 2) {
mInfo("db:%s, vgId:%d, will add 1 vnode, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);

View File

@ -2543,7 +2543,8 @@ static int32_t doSysTableScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes)
if (pInfo->showRewrite) {
getDBNameFromCondition(pInfo->pCondition, dbName);
if (strncasecmp(name, TSDB_INS_TABLE_COMPACTS, TSDB_TABLE_FNAME_LEN) != 0 &&
strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, TSDB_TABLE_FNAME_LEN) != 0) {
strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, TSDB_TABLE_FNAME_LEN) != 0 &&
strncasecmp(name, TSDB_INS_TABLE_TRANSACTION_DETAILS, TSDB_TABLE_FNAME_LEN) != 0) {
TAOS_UNUSED(tsnprintf(pInfo->req.db, sizeof(pInfo->req.db), "%d.%s", pInfo->accountId, dbName));
}
} else if (strncasecmp(name, TSDB_INS_TABLE_COLS, TSDB_TABLE_FNAME_LEN) == 0) {

View File

@ -292,6 +292,8 @@ const char* nodesNodeName(ENodeType type) {
return "ShowCompactsStmt";
case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT:
return "ShowCompactDetailsStmt";
case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT:
return "ShowTransactionDetailsStmt";
case QUERY_NODE_SHOW_GRANTS_FULL_STMT:
return "ShowGrantsFullStmt";
case QUERY_NODE_SHOW_GRANTS_LOGS_STMT:

View File

@ -715,6 +715,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT:
code = makeNode(type, sizeof(SShowCompactDetailsStmt), &pNode);
break;
case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT:
code = makeNode(type, sizeof(SShowTransactionDetailsStmt), &pNode);
break;
case QUERY_NODE_KILL_QUERY_STMT:
code = makeNode(type, sizeof(SKillQueryStmt), &pNode);
break;
@ -1562,6 +1565,11 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pCompactId);
break;
}
case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT: {
SShowTransactionDetailsStmt* pStmt = (SShowTransactionDetailsStmt*)pNode;
nodesDestroyNode(pStmt->pTransactionId);
break;
}
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
taosMemoryFreeClear(((SShowCreateDatabaseStmt*)pNode)->pCfg);
break;

View File

@ -320,6 +320,7 @@ SNode* createCreateViewStmt(SAstCreateContext* pCxt, bool orReplace, SNode* pVie
SNode* createDropViewStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pView);
SNode* createShowCompactDetailsStmt(SAstCreateContext* pCxt, SNode* pCompactIdNode);
SNode* createShowCompactsStmt(SAstCreateContext* pCxt, ENodeType type);
SNode* createShowTransactionDetailsStmt(SAstCreateContext* pCxt, SNode* pTransactionIdNode);
SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* tsmaName, SNode* pOptions,
SNode* pRealTable, SNode* pInterval);

View File

@ -594,6 +594,7 @@ cmd ::= SHOW BNODES.
cmd ::= SHOW SNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SNODES_STMT); }
cmd ::= SHOW CLUSTER. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CLUSTER_STMT); }
cmd ::= SHOW TRANSACTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_TRANSACTIONS_STMT); }
cmd ::= SHOW TRANSACTION NK_INTEGER(A). { pCxt->pRootNode = createShowTransactionDetailsStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A)); }
cmd ::= SHOW TABLE DISTRIBUTED full_table_name(A). { pCxt->pRootNode = createShowTableDistributedStmt(pCxt, A); }
cmd ::= SHOW CONSUMERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONSUMERS_STMT); }
cmd ::= SHOW SUBSCRIPTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT); }

View File

@ -2931,6 +2931,18 @@ _err:
return NULL;
}
SNode* createShowTransactionDetailsStmt(SAstCreateContext* pCxt, SNode* pTransactionIdNode) {
CHECK_PARSER_STATUS(pCxt);
SShowTransactionDetailsStmt* pStmt = NULL;
pCxt->errCode = nodesMakeNode(QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT, (SNode**)&pStmt);
CHECK_MAKE_NODE(pStmt);
pStmt->pTransactionId = pTransactionIdNode;
return (SNode*)pStmt;
_err:
nodesDestroyNode(pTransactionIdNode);
return NULL;
}
static int32_t getIpV4RangeFromWhitelistItem(char* ipRange, SIpV4Range* pIpRange) {
int32_t code = TSDB_CODE_SUCCESS;
char* ipCopy = taosStrdup(ipRange);

View File

@ -770,6 +770,12 @@ static int32_t collectMetaKeyFromShowCompactDetails(SCollectMetaKeyCxt* pCxt, SS
return code;
}
static int32_t collectMetaKeyFromShowTransactionDetails(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_TRANSACTION_DETAILS, pCxt->pMetaCache);
return code;
}
static int32_t collectMetaKeyFromShowGrantsFull(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_GRANTS_FULL,
pCxt->pMetaCache);
@ -1094,6 +1100,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
return collectMetaKeyFromShowCompacts(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT:
return collectMetaKeyFromShowCompactDetails(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT:
return collectMetaKeyFromShowTransactionDetails(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_GRANTS_FULL_STMT:
return collectMetaKeyFromShowGrantsFull(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_GRANTS_LOGS_STMT:

View File

@ -320,6 +320,12 @@ static const SSysTableShowAdapter sysTableShowAdapter[] = {
.numOfShowCols = 1,
.pShowCols = {"*"}
},
{ .showType = QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_TRANSACTION_DETAILS,
.numOfShowCols = 1,
.pShowCols = {"*"}
},
{ .showType = QUERY_NODE_SHOW_GRANTS_FULL_STMT,
.pDbName = TSDB_INFORMATION_SCHEMA_DB,
.pTableName = TSDB_INS_TABLE_GRANTS_FULL,
@ -16381,6 +16387,24 @@ static int32_t rewriteShowCompactDetailsStmt(STranslateContext* pCxt, SQuery* pQ
return code;
}
static int32_t rewriteShowTransactionDetailsStmt(STranslateContext* pCxt, SQuery* pQuery) {
SShowTransactionDetailsStmt* pShow = (SShowTransactionDetailsStmt*)(pQuery->pRoot);
SSelectStmt* pStmt = NULL;
int32_t code = createSelectStmtForShow(QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT, &pStmt);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pShow->pTransactionId) {
code = createOperatorNode(OP_TYPE_EQUAL, "transaction_id", pShow->pTransactionId, &pStmt->pWhere);
}
}
if (TSDB_CODE_SUCCESS == code) {
pCxt->showRewrite = true;
pQuery->showRewrite = true;
nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pStmt;
}
return code;
}
static int32_t createParWhenThenNode(SNode* pWhen, SNode* pThen, SNode** ppResWhenThen) {
SWhenThenNode* pWThen = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_WHEN_THEN, (SNode**)&pWThen);
@ -16956,6 +16980,9 @@ static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) {
case QUERY_NODE_SHOW_COMPACT_DETAILS_STMT:
code = rewriteShowCompactDetailsStmt(pCxt, pQuery);
break;
case QUERY_NODE_SHOW_TRANSACTION_DETAILS_STMT:
code = rewriteShowTransactionDetailsStmt(pCxt, pQuery);
break;
case QUERY_NODE_SHOW_DB_ALIVE_STMT:
case QUERY_NODE_SHOW_CLUSTER_ALIVE_STMT:
code = rewriteShowAliveStmt(pCxt, pQuery);

View File

@ -326,6 +326,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While ex
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CTX_SWITCH, "Wrong transaction execution context")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT_COMPACT, "Transaction not completed due to conflict with compact")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_ABLE_TO_kILLED, "The transaction is not able to be killed")
// mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")

View File

@ -398,11 +398,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/empty_identifier.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_transaction_detail.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/persisit_config.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/qmemCtrl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/compact_vgroups.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_create.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/composite_primary_key_delete.py

View File

@ -0,0 +1,64 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.dnodes import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to init {__file__}")
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), logSql)
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
def run(self):
tdLog.debug(f"start to excute {__file__}")
tdSql.execute('CREATE DATABASE db vgroups 160 replica 3;')
tdSql.execute('balance vgroup leader')
sql ="show transactions;"
rows = tdSql.query(sql)
if rows > 0:
tranId = tdSql.getData(0, 0)
tdLog.info('kill transaction %d'%tranId)
tdSql.execute('kill transaction %d'%tranId, queryTimes=1 )
if self.waitTransactionZero() is False:
tdLog.exit(f"{sql} transaction not finished")
return False
def waitTransactionZero(self, seconds = 300, interval = 1):
# wait end
for i in range(seconds):
sql ="show transactions;"
rows = tdSql.query(sql)
if rows == 0:
tdLog.info("transaction count became zero.")
return True
#tdLog.info(f"i={i} wait ...")
time.sleep(interval)
return False
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,105 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.dnodes import *
from util.sql import *
from util.cluster import *
import threading
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to init {__file__}")
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), logSql)
self.dnodes = cluster.dnodes
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
def run(self):
tdLog.debug(f"start to excute {__file__}")
tdLog.info("CREATE DATABASE db1 vgroups 16 replica 1;")
tdSql.execute('CREATE DATABASE db1 vgroups 16 replica 1;')
if self.waitTransactionZero() is False:
tdLog.exit(f"{sql} transaction not finished")
return False
newTdSql1=tdCom.newTdSql()
t1 = threading.Thread(target=self.alterDbThread, args=('', newTdSql1))
newTdSql2=tdCom.newTdSql()
t2 = threading.Thread(target=self.createDbThread, args=('', newTdSql2))
t1.start()
t2.start()
dnode = self.dnodes[1]
# stop dnode
tdLog.info(f"stop dnode 1")
dnode.stoptaosd()
rows = tdSql.query("show transactions;")
if rows > 0:
tranId = tdSql.getData(0, 0)
tdLog.info(f"show transaction {tranId}")
rows = tdSql.query(f"show transaction {tranId}", queryTimes=1)
if rows != 160:
tdLog.exit(f"restore transaction detial error, rows={rows}")
return False
rows = tdSql.query("show transactions;")
if rows > 0:
tranId = tdSql.getData(1, 0)
tdLog.info(f"show transaction {tranId}")
rows = tdSql.query(f"show transaction {tranId}", queryTimes=1)
if rows != 176:
tdLog.exit(f"restore transaction detial error, rows={rows}")
return False
tdLog.info(f"select * from ins_transaction_details")
rows = tdSql.query(f"select * from information_schema.ins_transaction_details", queryTimes=1)
if rows != 336:
tdLog.exit(f"restore transaction detial error, rows={rows}")
return False
def createDbThread(self, sql, newTdSql):
tdLog.info("CREATE DATABASE db2 vgroups 160 replica 1;")
newTdSql.execute('CREATE DATABASE db2 vgroups 160 replica 1;')
def alterDbThread(self, sql, newTdSql):
tdLog.info("alter DATABASE db1 replica 3;")
newTdSql.execute('alter DATABASE db1 replica 3;')
def waitTransactionZero(self, seconds = 300, interval = 1):
# wait end
for i in range(seconds):
sql ="show transactions;"
rows = tdSql.query(sql)
if rows == 0:
tdLog.info("transaction count became zero.")
return True
#tdLog.info(f"i={i} wait ...")
time.sleep(interval)
return False
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,90 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
sys.path.append("./3-enterprise/restore")
from restoreBasic import *
from util.common import tdCom
import threading
class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
tdLog.debug("start to execute %s" % __file__)
self.basic = RestoreBasic()
self.basic.init(conn, logSql, replicaVar)
# run
def run(self):
self.basic.restore_dnode_prepare(2)
self.execute()
def execute(self):
newTdSql=tdCom.newTdSql()
t0 = threading.Thread(target=self.restoreDnodeThread, args=('', newTdSql))
t0.start()
time.sleep(2)
sql ="show transactions;"
tdLog.info(sql)
rows = tdSql.query(sql)
if rows > 0:
self.basic.stop_dnode(2)
tranId = tdSql.getData(0, 0)
tdLog.info('show transaction %d'%tranId)
rows=tdSql.query('show transaction %d'%tranId, queryTimes=1)
if rows != 13:
tdLog.exit(f"restore transaction detial error, rows={rows}")
return False
tdLog.info('kill transaction %d'%tranId)
tdSql.execute('kill transaction %d'%tranId, queryTimes=1 )
time.sleep(3)
sql ="show transactions;"
tdLog.info(sql)
rows = tdSql.query(sql)
if rows > 0:
tdLog.info(f"{sql} transaction not finished")
return False
self.basic.restore_dnode_exec(2)
else:
tdLog.exit(f"{sql} no transaction exist")
return False
def restoreDnodeThread(self, p, newTdSql):
sleep(1)
sql = f"restore dnode 2"
tdLog.info(sql)
newTdSql.error(sql, expectErrInfo="Wrong transaction execution context")
tdLog.info(f"{sql} finished")
# stop
def stop(self):
self.basic.stop()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -143,6 +143,34 @@ class RestoreBasic:
tdSql.execute(sql)
self.check_corrent()
def restore_dnode_prepare(self, index):
tdLog.info(f"start restore dnode {index}")
dnode = self.dnodes[index - 1]
# stop dnode
tdLog.info(f"stop dnode {index}")
dnode.stoptaosd()
# remove dnode folder
try:
shutil.rmtree(dnode.dataDir)
tdLog.info(f"delete dir {dnode.dataDir} successful")
except OSError as x:
tdLog.exit(f"remove path {dnode.dataDir} error : {x.strerror}")
dnode.starttaosd()
def restore_dnode_exec(self, index):
# exec restore
sql = f"restore dnode {index}"
tdLog.info(sql)
tdSql.execute(sql)
self.check_corrent()
def stop_dnode(self, index):
dnode = self.dnodes[index - 1]
dnode.starttaosd()
# restore vnode
def restore_vnode(self, index):
tdLog.info(f"start restore vnode on dnode {index}")