diff --git a/include/util/tdef.h b/include/util/tdef.h index 9b18c2dfb8..35655c8eaf 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -215,8 +215,8 @@ do { \ #define TSDB_SLOW_QUERY_SQL_LEN 512 #define TSDB_TRANS_STAGE_LEN 12 -#define TSDB_TRANS_DESC_LEN 16 -#define TSDB_TRANS_ERROR_LEN 128 +#define TSDB_TRANS_TYPE_LEN 16 +#define TSDB_TRANS_ERROR_LEN 64 #define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_DESC_LEN 128 diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 931cda475c..a0ba71a1eb 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -104,6 +104,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 777492697a..8a14d882da 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -85,6 +85,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) { mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer); mndTransPullup(pMnode); + sdbVer = sdbUpdateVer(pSdb, 0); + mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer); if (sdbVer != lastSdbVer) { mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index f7226d9df7..7d684cd542 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -221,7 +221,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans = sdbGetRowObj(pRow); if (pTrans == NULL) goto TRANS_DECODE_OVER; - SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER) int16_t type = 0; @@ -353,8 +352,60 @@ static const char *mndTransStr(ETrnStage stage) { static const char *mndTransType(ETrnType type) { switch (type) { + case TRN_TYPE_CREATE_USER: + return "create-user"; + case TRN_TYPE_ALTER_USER: + return "alter-user"; + case TRN_TYPE_DROP_USER: + return "drop-user"; + case TRN_TYPE_CREATE_FUNC: + return "create-func"; + case TRN_TYPE_DROP_FUNC: + return "drop-func"; + case TRN_TYPE_CREATE_SNODE: + return "create-snode"; + case TRN_TYPE_DROP_SNODE: + return "drop-snode"; + case TRN_TYPE_CREATE_QNODE: + return "create-qnode"; + case TRN_TYPE_DROP_QNODE: + return "drop-qnode"; + case TRN_TYPE_CREATE_BNODE: + return "create-bnode"; + case TRN_TYPE_DROP_BNODE: + return "drop-bnode"; + case TRN_TYPE_CREATE_MNODE: + return "create-mnode"; + case TRN_TYPE_DROP_MNODE: + return "drop-mnode"; + case TRN_TYPE_CREATE_TOPIC: + return "create-topic"; + case TRN_TYPE_DROP_TOPIC: + return "drop-topic"; + case TRN_TYPE_SUBSCRIBE: + return "subscribe"; + case TRN_TYPE_REBALANCE: + return "rebalance"; + case TRN_TYPE_CREATE_DNODE: + return "create-qnode"; + case TRN_TYPE_DROP_DNODE: + return "drop-qnode"; case TRN_TYPE_CREATE_DB: return "create-db"; + case TRN_TYPE_ALTER_DB: + return "alter-db"; + case TRN_TYPE_DROP_DB: + return "drop-db"; + case TRN_TYPE_SPLIT_VGROUP: + return "split-vgroup"; + case TRN_TYPE_MERGE_VGROUP: + return "merge-vgroup"; + case TRN_TYPE_CREATE_STB: + return "create-stb"; + case TRN_TYPE_ALTER_STB: + return "alter-stb"; + case TRN_TYPE_DROP_STB: + return "drop-stb"; default: return "invalid"; } @@ -391,6 +442,11 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG)); } + if (pNew->stage == TRN_STAGE_ROLLBACK) { + pNew->stage = TRN_STAGE_FINISHED; + mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED)); + } + mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); pOld->stage = pNew->stage; @@ -423,6 +479,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; pTrans->transType = type; + pTrans->createdTime = taosGetTimestampMs(); pTrans->rpcHandle = pReq->handle; pTrans->rpcAHandle = pReq->ahandle; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -754,6 +811,9 @@ void mndTransProcessRsp(SMnodeMsg *pRsp) { if (pAction != NULL) { pAction->msgReceived = 1; pAction->errCode = pRsp->rpcMsg.code; + if (pAction->errCode != 0) { + tstrncpy(pTrans->lastError, tstrerror(pAction->errCode), TSDB_TRANS_ERROR_LEN); + } } mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code, @@ -1059,6 +1119,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; while (continueExec) { + pTrans->lastExecTime = taosGetTimestampMs(); switch (pTrans->stage) { case TRN_STAGE_PREPARE: continueExec = mndTransPerformPrepareStage(pMnode, pTrans); @@ -1170,10 +1231,9 @@ static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { } code = mndKillTrans(pMnode, pTrans); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; KILL_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); return -1; } @@ -1228,7 +1288,7 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pSchema[cols].bytes = pShow->bytes[cols]; cols++; - pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE; + pShow->bytes[cols] = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "type"); pSchema[cols].bytes = pShow->bytes[cols]; @@ -1288,11 +1348,7 @@ static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, in pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; char *name = mnGetDbStr(pTrans->dbname); - if (name != NULL) { - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); - } else { - STR_TO_VARSTR(pWrite, "-"); - } + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; diff --git a/source/dnode/mnode/impl/test/trans/trans.cpp b/source/dnode/mnode/impl/test/trans/trans.cpp index 8a62ed639a..d4c40dd428 100644 --- a/source/dnode/mnode/impl/test/trans/trans.cpp +++ b/source/dnode/mnode/impl/test/trans/trans.cpp @@ -28,7 +28,7 @@ class MndTestTrans : public ::testing::Test { static void KillThenRestartServer() { char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data"; FileFd fd = taosOpenFileRead(file); - int32_t size = 1024 * 1024; + int32_t size = 3 * 1024 * 1024; void* buffer = malloc(size); int32_t readLen = taosReadFile(fd, buffer, size); if (readLen < 0 || readLen == size) { @@ -61,6 +61,37 @@ class MndTestTrans : public ::testing::Test { Testbase MndTestTrans::test; TestServer MndTestTrans::server2; +TEST_F(MndTestTrans, 00_Create_User_Crash) { + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); + CHECK_META("show trans", 7); + + CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id"); + CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); + CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage"); + CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db"); + CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type"); + CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time"); + CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error"); + + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } + + { + SKillTransReq killReq = {0}; + killReq.transId = 3; + + int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSKillTransReq(pReq, contLen, &killReq); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TRANS_NOT_EXIST); + } +} + TEST_F(MndTestTrans, 01_Create_User_Crash) { { SCreateUserReq createReq = {0}; @@ -171,6 +202,57 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); } + { + // show trans + test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); + CHECK_META("show trans", 7); + test.SendShowRetrieveReq(); + + EXPECT_EQ(test.GetShowRows(), 1); + CheckInt32(4); + CheckTimestamp(); + CheckBinary("undoAction", TSDB_TRANS_STAGE_LEN); + CheckBinary("", TSDB_DB_NAME_LEN - 1); + CheckBinary("create-qnode", TSDB_TRANS_TYPE_LEN); + CheckTimestamp(); + CheckBinary("Unable to establish connection", TSDB_TRANS_ERROR_LEN - 1); + } + + // kill trans + { + SKillTransReq killReq = {0}; + killReq.transId = 4; + + int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSKillTransReq(pReq, contLen, &killReq); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + // show trans + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } + + // re-create trans + { + SMCreateQnodeReq createReq = {0}; + createReq.dnodeId = 2; + + int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + KillThenRestartServer(); server2.DoStart(); @@ -200,4 +282,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { test.SendShowRetrieveReq(); EXPECT_EQ(test.GetShowRows(), 2); } -} \ No newline at end of file +} + +// create db +// partial create stb +// drop db failed +// create stb failed +// start +// create stb success +// drop db success