diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 931cda475c..a0ba71a1eb 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -104,6 +104,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 777492697a..8a14d882da 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -85,6 +85,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) { mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer); mndTransPullup(pMnode); + sdbVer = sdbUpdateVer(pSdb, 0); + mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer); if (sdbVer != lastSdbVer) { mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5b1abdeb74..7d684cd542 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -442,6 +442,11 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG)); } + if (pNew->stage == TRN_STAGE_ROLLBACK) { + pNew->stage = TRN_STAGE_FINISHED; + mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED)); + } + mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); pOld->stage = pNew->stage; @@ -1226,10 +1231,9 @@ static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) { } code = mndKillTrans(pMnode, pTrans); - if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; KILL_OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { mError("trans:%d, failed to kill since %s", killReq.transId, terrstr()); return -1; } diff --git a/source/dnode/mnode/impl/test/trans/trans.cpp b/source/dnode/mnode/impl/test/trans/trans.cpp index d29c4fd658..d4c40dd428 100644 --- a/source/dnode/mnode/impl/test/trans/trans.cpp +++ b/source/dnode/mnode/impl/test/trans/trans.cpp @@ -28,7 +28,7 @@ class MndTestTrans : public ::testing::Test { static void KillThenRestartServer() { char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data"; FileFd fd = taosOpenFileRead(file); - int32_t size = 1024 * 1024; + int32_t size = 3 * 1024 * 1024; void* buffer = malloc(size); int32_t readLen = taosReadFile(fd, buffer, size); if (readLen < 0 || readLen == size) { @@ -62,19 +62,34 @@ Testbase MndTestTrans::test; TestServer MndTestTrans::server2; TEST_F(MndTestTrans, 00_Create_User_Crash) { - test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); - CHECK_META("show trans", 7); + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); + CHECK_META("show trans", 7); - CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id"); - CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); - CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage"); - CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db"); - CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type"); - CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time"); - CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error"); + CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id"); + CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); + CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage"); + CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db"); + CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type"); + CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time"); + CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error"); - test.SendShowRetrieveReq(); - EXPECT_EQ(test.GetShowRows(), 0); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } + + { + SKillTransReq killReq = {0}; + killReq.transId = 3; + + int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSKillTransReq(pReq, contLen, &killReq); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TRANS_NOT_EXIST); + } } TEST_F(MndTestTrans, 01_Create_User_Crash) { @@ -192,7 +207,7 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); CHECK_META("show trans", 7); test.SendShowRetrieveReq(); - + EXPECT_EQ(test.GetShowRows(), 1); CheckInt32(4); CheckTimestamp(); @@ -202,12 +217,41 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { CheckTimestamp(); CheckBinary("Unable to establish connection", TSDB_TRANS_ERROR_LEN - 1); } - - //kill trans + + // kill trans + { + SKillTransReq killReq = {0}; + killReq.transId = 4; + + int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSKillTransReq(pReq, contLen, &killReq); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } // show trans + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, ""); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } // re-create trans + { + SMCreateQnodeReq createReq = {0}; + createReq.dnodeId = 2; + + int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq); + + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } KillThenRestartServer(); @@ -240,7 +284,6 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { } } - // create db // partial create stb // drop db failed