Merge pull request #10314 from taosdata/feature/privilege
fix trans bug
This commit is contained in:
commit
c1d2446995
|
@ -215,8 +215,8 @@ do { \
|
||||||
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
#define TSDB_SLOW_QUERY_SQL_LEN 512
|
||||||
|
|
||||||
#define TSDB_TRANS_STAGE_LEN 12
|
#define TSDB_TRANS_STAGE_LEN 12
|
||||||
#define TSDB_TRANS_DESC_LEN 16
|
#define TSDB_TRANS_TYPE_LEN 16
|
||||||
#define TSDB_TRANS_ERROR_LEN 128
|
#define TSDB_TRANS_ERROR_LEN 64
|
||||||
|
|
||||||
#define TSDB_STEP_NAME_LEN 32
|
#define TSDB_STEP_NAME_LEN 32
|
||||||
#define TSDB_STEP_DESC_LEN 128
|
#define TSDB_STEP_DESC_LEN 128
|
||||||
|
|
|
@ -104,6 +104,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
|
||||||
|
|
|
@ -85,6 +85,8 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
mDebug("restore sdb wal finished, sdb ver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
mndTransPullup(pMnode);
|
mndTransPullup(pMnode);
|
||||||
|
sdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
|
mDebug("pullup trans finished, sdb ver:%" PRId64, sdbVer);
|
||||||
|
|
||||||
if (sdbVer != lastSdbVer) {
|
if (sdbVer != lastSdbVer) {
|
||||||
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
|
||||||
|
|
|
@ -221,7 +221,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
pTrans = sdbGetRowObj(pRow);
|
pTrans = sdbGetRowObj(pRow);
|
||||||
if (pTrans == NULL) goto TRANS_DECODE_OVER;
|
if (pTrans == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
|
||||||
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
|
||||||
|
|
||||||
int16_t type = 0;
|
int16_t type = 0;
|
||||||
|
@ -353,8 +352,60 @@ static const char *mndTransStr(ETrnStage stage) {
|
||||||
|
|
||||||
static const char *mndTransType(ETrnType type) {
|
static const char *mndTransType(ETrnType type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
case TRN_TYPE_CREATE_USER:
|
||||||
|
return "create-user";
|
||||||
|
case TRN_TYPE_ALTER_USER:
|
||||||
|
return "alter-user";
|
||||||
|
case TRN_TYPE_DROP_USER:
|
||||||
|
return "drop-user";
|
||||||
|
case TRN_TYPE_CREATE_FUNC:
|
||||||
|
return "create-func";
|
||||||
|
case TRN_TYPE_DROP_FUNC:
|
||||||
|
return "drop-func";
|
||||||
|
case TRN_TYPE_CREATE_SNODE:
|
||||||
|
return "create-snode";
|
||||||
|
case TRN_TYPE_DROP_SNODE:
|
||||||
|
return "drop-snode";
|
||||||
|
case TRN_TYPE_CREATE_QNODE:
|
||||||
|
return "create-qnode";
|
||||||
|
case TRN_TYPE_DROP_QNODE:
|
||||||
|
return "drop-qnode";
|
||||||
|
case TRN_TYPE_CREATE_BNODE:
|
||||||
|
return "create-bnode";
|
||||||
|
case TRN_TYPE_DROP_BNODE:
|
||||||
|
return "drop-bnode";
|
||||||
|
case TRN_TYPE_CREATE_MNODE:
|
||||||
|
return "create-mnode";
|
||||||
|
case TRN_TYPE_DROP_MNODE:
|
||||||
|
return "drop-mnode";
|
||||||
|
case TRN_TYPE_CREATE_TOPIC:
|
||||||
|
return "create-topic";
|
||||||
|
case TRN_TYPE_DROP_TOPIC:
|
||||||
|
return "drop-topic";
|
||||||
|
case TRN_TYPE_SUBSCRIBE:
|
||||||
|
return "subscribe";
|
||||||
|
case TRN_TYPE_REBALANCE:
|
||||||
|
return "rebalance";
|
||||||
|
case TRN_TYPE_CREATE_DNODE:
|
||||||
|
return "create-qnode";
|
||||||
|
case TRN_TYPE_DROP_DNODE:
|
||||||
|
return "drop-qnode";
|
||||||
case TRN_TYPE_CREATE_DB:
|
case TRN_TYPE_CREATE_DB:
|
||||||
return "create-db";
|
return "create-db";
|
||||||
|
case TRN_TYPE_ALTER_DB:
|
||||||
|
return "alter-db";
|
||||||
|
case TRN_TYPE_DROP_DB:
|
||||||
|
return "drop-db";
|
||||||
|
case TRN_TYPE_SPLIT_VGROUP:
|
||||||
|
return "split-vgroup";
|
||||||
|
case TRN_TYPE_MERGE_VGROUP:
|
||||||
|
return "merge-vgroup";
|
||||||
|
case TRN_TYPE_CREATE_STB:
|
||||||
|
return "create-stb";
|
||||||
|
case TRN_TYPE_ALTER_STB:
|
||||||
|
return "alter-stb";
|
||||||
|
case TRN_TYPE_DROP_STB:
|
||||||
|
return "drop-stb";
|
||||||
default:
|
default:
|
||||||
return "invalid";
|
return "invalid";
|
||||||
}
|
}
|
||||||
|
@ -391,6 +442,11 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
||||||
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
|
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pNew->stage == TRN_STAGE_ROLLBACK) {
|
||||||
|
pNew->stage = TRN_STAGE_FINISHED;
|
||||||
|
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED));
|
||||||
|
}
|
||||||
|
|
||||||
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
|
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
|
||||||
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
|
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
|
||||||
pOld->stage = pNew->stage;
|
pOld->stage = pNew->stage;
|
||||||
|
@ -423,6 +479,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
|
||||||
pTrans->stage = TRN_STAGE_PREPARE;
|
pTrans->stage = TRN_STAGE_PREPARE;
|
||||||
pTrans->policy = policy;
|
pTrans->policy = policy;
|
||||||
pTrans->transType = type;
|
pTrans->transType = type;
|
||||||
|
pTrans->createdTime = taosGetTimestampMs();
|
||||||
pTrans->rpcHandle = pReq->handle;
|
pTrans->rpcHandle = pReq->handle;
|
||||||
pTrans->rpcAHandle = pReq->ahandle;
|
pTrans->rpcAHandle = pReq->ahandle;
|
||||||
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
||||||
|
@ -754,6 +811,9 @@ void mndTransProcessRsp(SMnodeMsg *pRsp) {
|
||||||
if (pAction != NULL) {
|
if (pAction != NULL) {
|
||||||
pAction->msgReceived = 1;
|
pAction->msgReceived = 1;
|
||||||
pAction->errCode = pRsp->rpcMsg.code;
|
pAction->errCode = pRsp->rpcMsg.code;
|
||||||
|
if (pAction->errCode != 0) {
|
||||||
|
tstrncpy(pTrans->lastError, tstrerror(pAction->errCode), TSDB_TRANS_ERROR_LEN);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
|
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
|
||||||
|
@ -1059,6 +1119,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
|
|
||||||
while (continueExec) {
|
while (continueExec) {
|
||||||
|
pTrans->lastExecTime = taosGetTimestampMs();
|
||||||
switch (pTrans->stage) {
|
switch (pTrans->stage) {
|
||||||
case TRN_STAGE_PREPARE:
|
case TRN_STAGE_PREPARE:
|
||||||
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
|
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
|
||||||
|
@ -1170,10 +1231,9 @@ static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndKillTrans(pMnode, pTrans);
|
code = mndKillTrans(pMnode, pTrans);
|
||||||
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
|
||||||
|
|
||||||
KILL_OVER:
|
KILL_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
|
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1228,7 +1288,7 @@ static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pShow->bytes[cols] = (TSDB_TRANS_DESC_LEN - 1) + VARSTR_HEADER_SIZE;
|
pShow->bytes[cols] = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE;
|
||||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
strcpy(pSchema[cols].name, "type");
|
strcpy(pSchema[cols].name, "type");
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
|
@ -1288,11 +1348,7 @@ static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
char *name = mnGetDbStr(pTrans->dbname);
|
char *name = mnGetDbStr(pTrans->dbname);
|
||||||
if (name != NULL) {
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
|
||||||
} else {
|
|
||||||
STR_TO_VARSTR(pWrite, "-");
|
|
||||||
}
|
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
|
|
@ -28,7 +28,7 @@ class MndTestTrans : public ::testing::Test {
|
||||||
static void KillThenRestartServer() {
|
static void KillThenRestartServer() {
|
||||||
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
|
char file[PATH_MAX] = "/tmp/mnode_test_trans/mnode/data/sdb.data";
|
||||||
FileFd fd = taosOpenFileRead(file);
|
FileFd fd = taosOpenFileRead(file);
|
||||||
int32_t size = 1024 * 1024;
|
int32_t size = 3 * 1024 * 1024;
|
||||||
void* buffer = malloc(size);
|
void* buffer = malloc(size);
|
||||||
int32_t readLen = taosReadFile(fd, buffer, size);
|
int32_t readLen = taosReadFile(fd, buffer, size);
|
||||||
if (readLen < 0 || readLen == size) {
|
if (readLen < 0 || readLen == size) {
|
||||||
|
@ -61,6 +61,37 @@ class MndTestTrans : public ::testing::Test {
|
||||||
Testbase MndTestTrans::test;
|
Testbase MndTestTrans::test;
|
||||||
TestServer MndTestTrans::server2;
|
TestServer MndTestTrans::server2;
|
||||||
|
|
||||||
|
TEST_F(MndTestTrans, 00_Create_User_Crash) {
|
||||||
|
{
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
||||||
|
CHECK_META("show trans", 7);
|
||||||
|
|
||||||
|
CHECK_SCHEMA(0, TSDB_DATA_TYPE_INT, 4, "id");
|
||||||
|
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||||
|
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, "stage");
|
||||||
|
CHECK_SCHEMA(3, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "db");
|
||||||
|
CHECK_SCHEMA(4, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, "type");
|
||||||
|
CHECK_SCHEMA(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "last_exec_time");
|
||||||
|
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, TSDB_TRANS_ERROR_LEN - 1 + VARSTR_HEADER_SIZE, "last_error");
|
||||||
|
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
SKillTransReq killReq = {0};
|
||||||
|
killReq.transId = 3;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSKillTransReq(pReq, contLen, &killReq);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TRANS_NOT_EXIST);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(MndTestTrans, 01_Create_User_Crash) {
|
TEST_F(MndTestTrans, 01_Create_User_Crash) {
|
||||||
{
|
{
|
||||||
SCreateUserReq createReq = {0};
|
SCreateUserReq createReq = {0};
|
||||||
|
@ -171,6 +202,57 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// show trans
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
||||||
|
CHECK_META("show trans", 7);
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
|
CheckInt32(4);
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckBinary("undoAction", TSDB_TRANS_STAGE_LEN);
|
||||||
|
CheckBinary("", TSDB_DB_NAME_LEN - 1);
|
||||||
|
CheckBinary("create-qnode", TSDB_TRANS_TYPE_LEN);
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckBinary("Unable to establish connection", TSDB_TRANS_ERROR_LEN - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// kill trans
|
||||||
|
{
|
||||||
|
SKillTransReq killReq = {0};
|
||||||
|
killReq.transId = 4;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSKillTransReq(NULL, 0, &killReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSKillTransReq(pReq, contLen, &killReq);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_KILL_TRANS, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// show trans
|
||||||
|
{
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TRANS, "");
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// re-create trans
|
||||||
|
{
|
||||||
|
SMCreateQnodeReq createReq = {0};
|
||||||
|
createReq.dnodeId = 2;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
|
||||||
|
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
|
||||||
|
}
|
||||||
|
|
||||||
KillThenRestartServer();
|
KillThenRestartServer();
|
||||||
|
|
||||||
server2.DoStart();
|
server2.DoStart();
|
||||||
|
@ -200,4 +282,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
|
||||||
test.SendShowRetrieveReq();
|
test.SendShowRetrieveReq();
|
||||||
EXPECT_EQ(test.GetShowRows(), 2);
|
EXPECT_EQ(test.GetShowRows(), 2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create db
|
||||||
|
// partial create stb
|
||||||
|
// drop db failed
|
||||||
|
// create stb failed
|
||||||
|
// start
|
||||||
|
// create stb success
|
||||||
|
// drop db success
|
||||||
|
|
Loading…
Reference in New Issue