when wal is restored, the stage of trans is changed from rollback to finish
This commit is contained in:
parent
44eb20628e
commit
1db7368570
|
@ -104,6 +104,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
|
||||||
|
|
|
@ -85,6 +85,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
mndTransPullup(pMnode);
|
mndTransPullup(pMnode);
|
||||||
|
sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
|
mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
if (sdbVer != lastSdbVer) {
|
if (sdbVer != lastSdbVer) {
|
||||||
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
||||||
|
|
|
@ -442,6 +442,11 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
||||||
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
|
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pNew->stage == TRN_STAGE_ROLLBACK) {
|
||||||
|
pNew->stage = TRN_STAGE_FINISHED;
|
||||||
|
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED));
|
||||||
|
}
|
||||||
|
|
||||||
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
|
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
|
||||||
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
|
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
|
||||||
pOld->stage = pNew->stage;
|
pOld->stage = pNew->stage;
|
||||||
|
@ -1226,10 +1231,9 @@ static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndKillTrans(pMnode, pTrans);
|
code = mndKillTrans(pMnode, pTrans);
|
||||||
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
||||||
|
|
||||||
KILL_OVER:
|
KILL_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
|
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ class MndTestTrans : public ::testing::Test {
|
||||||
static void KillThenRestartServer() {
|
static void KillThenRestartServer() {
|
||||||
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
|
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
|
||||||
FileFd fd = taosOpenFileRead(file);
|
FileFd fd = taosOpenFileRead(file);
|
||||||
int32_t size = 1024 * 1024;
|
int32_t size = 3 * 1024 * 1024;
|
||||||
void* buffer = malloc(size);
|
void* buffer = malloc(size);
|
||||||
int32_t readLen = taosReadFile(fd, buffer, size);
|
int32_t readLen = taosReadFile(fd, buffer, size);
|
||||||
if (readLen < 0 || readLen == size) {
|
if (readLen < 0 || readLen == size) {
|
||||||
|
@ -62,19 +62,34 @@ Testbase MndTestTrans::test;
|
||||||
TestServer MndTestTrans::server2;
|
TestServer MndTestTrans::server2;
|
||||||
|
|
||||||
TEST_F(MndTestTrans, 00_Create_User_Crash) {
|
TEST_F(MndTestTrans, 00_Create_User_Crash) {
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
{
|
||||||
CHECK_META("show trans", 7);
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
||||||
|
CHECK_META("show trans", 7);
|
||||||
|
|
||||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id");
|
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id");
|
||||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage");
|
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage");
|
||||||
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db");
|
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db");
|
||||||
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type");
|
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type");
|
||||||
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time");
|
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time");
|
||||||
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error");
|
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error");
|
||||||
|
|
||||||
test.SendShowRetrieveReq();
|
test.SendShowRetrieveReq();
|
||||||
EXPECT_EQ(test.GetShowRows(), 0);
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SKillTransReq killReq = {0};
|
||||||
|
killReq.transId = 3;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSKillTransReq(pReq, contLen, &killReq);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TRANS_NOT_EXIST);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(MndTestTrans, 01_Create_User_Crash) {
|
TEST_F(MndTestTrans, 01_Create_User_Crash) {
|
||||||
|
@ -192,7 +207,7 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
|
||||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
||||||
CHECK_META("show trans", 7);
|
CHECK_META("show trans", 7);
|
||||||
test.SendShowRetrieveReq();
|
test.SendShowRetrieveReq();
|
||||||
|
|
||||||
EXPECT_EQ(test.GetShowRows(), 1);
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
CheckInt32(4);
|
CheckInt32(4);
|
||||||
CheckTimestamp();
|
CheckTimestamp();
|
||||||
|
@ -202,12 +217,41 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
|
||||||
CheckTimestamp();
|
CheckTimestamp();
|
||||||
CheckBinary("Unable to establish connection", TSDB_TRANS_ERROR_LEN - 1);
|
CheckBinary("Unable to establish connection", TSDB_TRANS_ERROR_LEN - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
//kill trans
|
// kill trans
|
||||||
|
{
|
||||||
|
SKillTransReq killReq = {0};
|
||||||
|
killReq.transId = 4;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSKillTransReq(pReq, contLen, &killReq);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
// show trans
|
// show trans
|
||||||
|
{
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
// re-create trans
|
// re-create trans
|
||||||
|
{
|
||||||
|
SMCreateQnodeReq createReq = {0};
|
||||||
|
createReq.dnodeId = 2;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
|
}
|
||||||
|
|
||||||
KillThenRestartServer();
|
KillThenRestartServer();
|
||||||
|
|
||||||
|
@ -240,7 +284,6 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// create db
|
// create db
|
||||||
// partial create stb
|
// partial create stb
|
||||||
// drop db failed
|
// drop db failed
|
||||||
|
|
Loading…
Reference in New Issue