commit
6b24cb5402
|
@ -977,6 +977,13 @@ typedef struct {
|
||||||
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
|
int32_t tSerializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
|
||||||
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
|
int32_t tDeserializeSKillConnReq(void* buf, int32_t bufLen, SKillConnReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t transId;
|
||||||
|
} SKillTransReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
|
||||||
|
int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
char spi;
|
char spi;
|
||||||
|
|
|
@ -136,6 +136,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "mnode-kill-trans", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||||
|
|
|
@ -246,6 +246,7 @@ int32_t* taosGetErrno();
|
||||||
// mnode-trans
|
// mnode-trans
|
||||||
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
||||||
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
|
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
|
||||||
|
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
|
||||||
|
|
||||||
// mnode-mq
|
// mnode-mq
|
||||||
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
||||||
|
|
|
@ -2239,6 +2239,31 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->transId) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSKillTransReq(void *buf, int32_t bufLen, SKillTransReq *pReq) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->transId) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
|
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
|
@ -27,6 +27,7 @@ void mndCleanupDb(SMnode *pMnode);
|
||||||
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
|
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
|
||||||
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
||||||
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
||||||
|
char *mnGetDbStr(char *src);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,7 +105,39 @@ typedef enum {
|
||||||
} ETrnStage;
|
} ETrnStage;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TRN_TYPE_CREATE_DB = 0,
|
TRN_TYPE_BASIC_SCOPE = 1000,
|
||||||
|
TRN_TYPE_CREATE_USER = 1001,
|
||||||
|
TRN_TYPE_ALTER_USER = 1002,
|
||||||
|
TRN_TYPE_DROP_USER = 1003,
|
||||||
|
TRN_TYPE_CREATE_FUNC = 1004,
|
||||||
|
TRN_TYPE_DROP_FUNC = 1005,
|
||||||
|
TRN_TYPE_CREATE_SNODE = 1006,
|
||||||
|
TRN_TYPE_DROP_SNODE = 1007,
|
||||||
|
TRN_TYPE_CREATE_QNODE = 1008,
|
||||||
|
TRN_TYPE_DROP_QNODE = 1009,
|
||||||
|
TRN_TYPE_CREATE_BNODE = 1010,
|
||||||
|
TRN_TYPE_DROP_BNODE = 1011,
|
||||||
|
TRN_TYPE_CREATE_MNODE = 1012,
|
||||||
|
TRN_TYPE_DROP_MNODE = 1013,
|
||||||
|
TRN_TYPE_CREATE_TOPIC = 1014,
|
||||||
|
TRN_TYPE_DROP_TOPIC = 1015,
|
||||||
|
TRN_TYPE_SUBSCRIBE = 1016,
|
||||||
|
TRN_TYPE_REBALANCE = 1017,
|
||||||
|
TRN_TYPE_BASIC_SCOPE_END,
|
||||||
|
TRN_TYPE_GLOBAL_SCOPE = 2000,
|
||||||
|
TRN_TYPE_CREATE_DNODE = 2001,
|
||||||
|
TRN_TYPE_DROP_DNODE = 2002,
|
||||||
|
TRN_TYPE_GLOBAL_SCOPE_END,
|
||||||
|
TRN_TYPE_DB_SCOPE = 3000,
|
||||||
|
TRN_TYPE_CREATE_DB = 3001,
|
||||||
|
TRN_TYPE_ALTER_DB = 3002,
|
||||||
|
TRN_TYPE_DROP_DB = 3003,
|
||||||
|
TRN_TYPE_CREATE_STB = 3004,
|
||||||
|
TRN_TYPE_ALTER_STB = 3005,
|
||||||
|
TRN_TYPE_DROP_STB = 3006,
|
||||||
|
TRN_TYPE_SPLIT_VGROUP = 3007,
|
||||||
|
TRN_TYPE_MERGE_VGROUP = 3018,
|
||||||
|
TRN_TYPE_DB_SCOPE_END,
|
||||||
} ETrnType;
|
} ETrnType;
|
||||||
|
|
||||||
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
|
||||||
|
@ -128,6 +160,7 @@ typedef struct {
|
||||||
int32_t id;
|
int32_t id;
|
||||||
ETrnStage stage;
|
ETrnStage stage;
|
||||||
ETrnPolicy policy;
|
ETrnPolicy policy;
|
||||||
|
ETrnType transType;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t failedTimes;
|
int32_t failedTimes;
|
||||||
void* rpcHandle;
|
void* rpcHandle;
|
||||||
|
@ -141,10 +174,9 @@ typedef struct {
|
||||||
SArray* undoActions;
|
SArray* undoActions;
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
int64_t lastExecTime;
|
int64_t lastExecTime;
|
||||||
int32_t transType;
|
|
||||||
uint64_t dbUid;
|
uint64_t dbUid;
|
||||||
char dbname[TSDB_DB_NAME_LEN];
|
char dbname[TSDB_DB_FNAME_LEN];
|
||||||
char lastError[TSDB_TRANS_DESC_LEN];
|
char lastError[TSDB_TRANS_ERROR_LEN];
|
||||||
} STrans;
|
} STrans;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -36,7 +36,7 @@ typedef struct {
|
||||||
int32_t mndInitTrans(SMnode *pMnode);
|
int32_t mndInitTrans(SMnode *pMnode);
|
||||||
void mndCleanupTrans(SMnode *pMnode);
|
void mndCleanupTrans(SMnode *pMnode);
|
||||||
|
|
||||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq);
|
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
|
||||||
void mndTransDrop(STrans *pTrans);
|
void mndTransDrop(STrans *pTrans);
|
||||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
|
@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
|
||||||
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
|
||||||
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
||||||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||||
|
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
|
||||||
|
|
||||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||||
void mndTransProcessRsp(SMnodeMsg *pRsp);
|
void mndTransProcessRsp(SMnodeMsg *pRsp);
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
||||||
#define TSDB_BNODE_VER_NUMBER 1
|
#define TSDB_BNODE_VER_NUMBER 1
|
||||||
#define TSDB_BNODE_RESERVE_SIZE 64
|
#define TSDB_BNODE_RESERVE_SIZE 64
|
||||||
|
|
||||||
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj);
|
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj);
|
||||||
|
@ -248,7 +248,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
||||||
bnodeObj.createdTime = taosGetTimestampMs();
|
bnodeObj.createdTime = taosGetTimestampMs();
|
||||||
bnodeObj.updateTime = bnodeObj.createdTime;
|
bnodeObj.updateTime = bnodeObj.createdTime;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_BNODE_OVER;
|
if (pTrans == NULL) goto CREATE_BNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
|
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
@ -366,7 +366,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
|
||||||
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) {
|
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pReq, SBnodeObj *pObj) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_BNODE_OVER;
|
if (pTrans == NULL) goto DROP_BNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
|
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
|
||||||
#define TSDB_DB_VER_NUMBER 1
|
#define TSDB_DB_VER_NUMBER 1
|
||||||
#define TSDB_DB_RESERVE_SIZE 64
|
#define TSDB_DB_RESERVE_SIZE 64
|
||||||
|
|
||||||
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
static SSdbRaw *mndDbActionEncode(SDbObj *pDb);
|
||||||
|
@ -434,11 +434,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbReq *pCreat
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_DB_OVER;
|
if (pTrans == NULL) goto CREATE_DB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||||
|
|
||||||
|
mndTransSetDbInfo(pTrans, &dbObj);
|
||||||
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||||
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||||
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER;
|
||||||
|
@ -620,11 +621,12 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
|
|
||||||
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto UPDATE_DB_OVER;
|
if (pTrans == NULL) goto UPDATE_DB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
|
mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name);
|
||||||
|
|
||||||
|
mndTransSetDbInfo(pTrans, pOld);
|
||||||
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
||||||
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
||||||
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto UPDATE_DB_OVER;
|
||||||
|
@ -799,10 +801,11 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
|
|
||||||
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
|
static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_DB_OVER;
|
if (pTrans == NULL) goto DROP_DB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
||||||
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
|
|
||||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
||||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER;
|
||||||
|
|
|
@ -440,7 +440,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq *
|
||||||
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
|
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
|
||||||
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
|
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -516,7 +516,7 @@ CREATE_DNODE_OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) {
|
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
||||||
#define SDB_FUNC_VER 1
|
#define SDB_FUNC_VER 1
|
||||||
#define SDB_FUNC_RESERVE_SIZE 64
|
#define SDB_FUNC_RESERVE_SIZE 64
|
||||||
|
|
||||||
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
|
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
|
||||||
|
@ -206,7 +206,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pReq, SCreateFuncReq *pC
|
||||||
memcpy(func.pComment, pCreate->pComment, pCreate->commentSize);
|
memcpy(func.pComment, pCreate->pComment, pCreate->commentSize);
|
||||||
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_FUNC_OVER;
|
if (pTrans == NULL) goto CREATE_FUNC_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
|
||||||
|
@ -236,7 +236,7 @@ CREATE_FUNC_OVER:
|
||||||
|
|
||||||
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
|
static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pReq, SFuncObj *pFunc) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_FUNC_OVER;
|
if (pTrans == NULL) goto DROP_FUNC_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
|
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
||||||
#define TSDB_MNODE_VER_NUMBER 1
|
#define TSDB_MNODE_VER_NUMBER 1
|
||||||
#define TSDB_MNODE_RESERVE_SIZE 64
|
#define TSDB_MNODE_RESERVE_SIZE 64
|
||||||
|
|
||||||
static int32_t mndCreateDefaultMnode(SMnode *pMnode);
|
static int32_t mndCreateDefaultMnode(SMnode *pMnode);
|
||||||
|
@ -359,7 +359,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
||||||
mnodeObj.createdTime = taosGetTimestampMs();
|
mnodeObj.createdTime = taosGetTimestampMs();
|
||||||
mnodeObj.updateTime = mnodeObj.createdTime;
|
mnodeObj.updateTime = mnodeObj.createdTime;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_MNODE_OVER;
|
if (pTrans == NULL) goto CREATE_MNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
@ -526,7 +526,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
||||||
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) {
|
static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pReq, SMnodeObj *pObj) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_MNODE_OVER;
|
if (pTrans == NULL) goto DROP_MNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||||
|
|
|
@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
||||||
qnodeObj.createdTime = taosGetTimestampMs();
|
qnodeObj.createdTime = taosGetTimestampMs();
|
||||||
qnodeObj.updateTime = qnodeObj.createdTime;
|
qnodeObj.updateTime = qnodeObj.createdTime;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_QNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_QNODE_OVER;
|
if (pTrans == NULL) goto CREATE_QNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
@ -366,7 +366,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
|
||||||
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) {
|
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pReq, SQnodeObj *pObj) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_QNODE_OVER;
|
if (pTrans == NULL) goto DROP_QNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
||||||
|
|
|
@ -21,7 +21,7 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
|
||||||
#define TSDB_SNODE_VER_NUMBER 1
|
#define TSDB_SNODE_VER_NUMBER 1
|
||||||
#define TSDB_SNODE_RESERVE_SIZE 64
|
#define TSDB_SNODE_RESERVE_SIZE 64
|
||||||
|
|
||||||
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
|
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
|
||||||
|
@ -248,7 +248,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode
|
||||||
snodeObj.createdTime = taosGetTimestampMs();
|
snodeObj.createdTime = taosGetTimestampMs();
|
||||||
snodeObj.updateTime = snodeObj.createdTime;
|
snodeObj.updateTime = snodeObj.createdTime;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_SNODE_OVER;
|
if (pTrans == NULL) goto CREATE_SNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
@ -368,7 +368,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
|
||||||
static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) {
|
static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pReq, SSnodeObj *pObj) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_SNODE_OVER;
|
if (pTrans == NULL) goto DROP_SNODE_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
||||||
|
|
|
@ -530,10 +530,11 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto CREATE_STB_OVER;
|
if (pTrans == NULL) goto CREATE_STB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||||
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
|
|
||||||
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto CREATE_STB_OVER;
|
||||||
|
@ -1021,10 +1022,11 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
|
||||||
if (code != 0) goto ALTER_STB_OVER;
|
if (code != 0) goto ALTER_STB_OVER;
|
||||||
|
|
||||||
code = -1;
|
code = -1;
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_STB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto ALTER_STB_OVER;
|
if (pTrans == NULL) goto ALTER_STB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||||
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
|
|
||||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||||
|
@ -1159,10 +1161,11 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
|
|
||||||
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_STB, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) goto DROP_STB_OVER;
|
if (pTrans == NULL) goto DROP_STB_OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||||
|
mndTransSetDbInfo(pTrans, pDb);
|
||||||
|
|
||||||
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto DROP_STB_OVER;
|
||||||
|
|
|
@ -369,8 +369,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont;
|
SMqDoRebalanceMsg *pReq = pMsg->rpcMsg.pCont;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
||||||
mInfo("mq rebalance start");
|
mInfo("mq rebalance start");
|
||||||
|
@ -969,7 +969,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
oldTopicNum = taosArrayGetSize(oldSub);
|
oldTopicNum = taosArrayGetSize(oldSub);
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
// TODO: free memory
|
// TODO: free memory
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -252,7 +252,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
|
||||||
topicObj.logicalPlan = pCreate->logicalPlan;
|
topicObj.logicalPlan = pCreate->logicalPlan;
|
||||||
topicObj.sqlLen = strlen(pCreate->sql);
|
topicObj.sqlLen = strlen(pCreate->sql);
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -343,7 +343,7 @@ CREATE_TOPIC_OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
|
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndAuth.h"
|
#include "mndAuth.h"
|
||||||
|
#include "mndDb.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
|
@ -54,7 +55,8 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
|
||||||
|
|
||||||
static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
|
static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
|
||||||
static void mndTransSendRpcRsp(STrans *pTrans);
|
static void mndTransSendRpcRsp(STrans *pTrans);
|
||||||
static int32_t mndProcessTransReq(SMnodeMsg *pMsg);
|
static int32_t mndProcessTransReq(SMnodeMsg *pReq);
|
||||||
|
static int32_t mndProcessKillTransReq(SMnodeMsg *pReq);
|
||||||
|
|
||||||
static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetTransMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
|
@ -70,6 +72,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
|
||||||
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
|
.deleteFp = (SdbDeleteFp)mndTransActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
|
||||||
|
|
||||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndGetTransMeta);
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TRANS, mndRetrieveTrans);
|
||||||
|
@ -122,8 +125,12 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
|
SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
|
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
|
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
|
||||||
|
SDB_SET_INT16(pRaw, dataPos, pTrans->transType, TRANS_ENCODE_OVER)
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, TRANS_ENCODE_OVER)
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, TRANS_ENCODE_OVER)
|
||||||
|
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_ENCODE_OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
|
SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
|
SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
|
SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
|
||||||
|
@ -214,11 +221,32 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
pTrans = sdbGetRowObj(pRow);
|
pTrans = sdbGetRowObj(pRow);
|
||||||
if (pTrans == NULL) goto TRANS_DECODE_OVER;
|
if (pTrans == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
|
||||||
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
|
||||||
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
|
||||||
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
|
||||||
pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
|
int16_t type = 0;
|
||||||
pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
|
int16_t policy = 0;
|
||||||
|
int16_t stage = 0;
|
||||||
|
SDB_GET_INT16(pRaw, dataPos, &policy, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT16(pRaw, dataPos, &stage, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT16(pRaw, dataPos, &type, TRANS_DECODE_OVER)
|
||||||
|
pTrans->policy = policy;
|
||||||
|
pTrans->stage = stage;
|
||||||
|
pTrans->transType = type;
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
|
||||||
|
|
||||||
|
pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *));
|
||||||
|
pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *));
|
||||||
|
pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *));
|
||||||
|
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
|
||||||
|
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
|
||||||
|
|
||||||
if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
|
if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
|
||||||
if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
|
if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
@ -226,15 +254,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
|
if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
|
||||||
if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
|
if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
|
||||||
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->stage, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < redoLogNum; ++i) {
|
for (int32_t i = 0; i < redoLogNum; ++i) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
|
||||||
pData = malloc(dataLen);
|
pData = malloc(dataLen);
|
||||||
|
@ -392,7 +411,7 @@ static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
|
||||||
sdbRelease(pSdb, pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
}
|
}
|
||||||
|
|
||||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
|
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq) {
|
||||||
STrans *pTrans = calloc(1, sizeof(STrans));
|
STrans *pTrans = calloc(1, sizeof(STrans));
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -403,6 +422,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
|
||||||
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
|
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
|
||||||
pTrans->stage = TRN_STAGE_PREPARE;
|
pTrans->stage = TRN_STAGE_PREPARE;
|
||||||
pTrans->policy = policy;
|
pTrans->policy = policy;
|
||||||
|
pTrans->transType = type;
|
||||||
pTrans->rpcHandle = pReq->handle;
|
pTrans->rpcHandle = pReq->handle;
|
||||||
pTrans->rpcAHandle = pReq->ahandle;
|
pTrans->rpcAHandle = pReq->ahandle;
|
||||||
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
|
||||||
|
@ -494,6 +514,11 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
|
||||||
pTrans->rpcRspLen = contLen;
|
pTrans->rpcRspLen = contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
|
||||||
|
pTrans->dbUid = pDb->uid;
|
||||||
|
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||||
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
|
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
|
||||||
if (pRaw == NULL) {
|
if (pRaw == NULL) {
|
||||||
|
@ -994,6 +1019,87 @@ static int32_t mndProcessTransReq(SMnodeMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
SArray *pArray = NULL;
|
||||||
|
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
|
||||||
|
pArray = pTrans->redoActions;
|
||||||
|
} else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
|
||||||
|
pArray = pTrans->undoActions;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t size = taosArrayGetSize(pArray);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
STransAction *pAction = taosArrayGet(pArray, i);
|
||||||
|
if (pAction == NULL) continue;
|
||||||
|
|
||||||
|
if (pAction->msgReceived == 0) {
|
||||||
|
mInfo("trans:%d, action:%d set processed", pTrans->id, i);
|
||||||
|
pAction->msgSent = 1;
|
||||||
|
pAction->msgReceived = 1;
|
||||||
|
pAction->errCode = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pAction->errCode != 0) {
|
||||||
|
mInfo("trans:%d, action:%d set processed, errCode from %s to success", pTrans->id, i,
|
||||||
|
tstrerror(pAction->errCode));
|
||||||
|
pAction->msgSent = 1;
|
||||||
|
pAction->msgReceived = 1;
|
||||||
|
pAction->errCode = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransExecute(pMnode, pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessKillTransReq(SMnodeMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
SKillTransReq killReq = {0};
|
||||||
|
int32_t code = -1;
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
|
if (tDeserializeSKillTransReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &killReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto KILL_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("trans:%d, start to kill", killReq.transId);
|
||||||
|
|
||||||
|
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
goto KILL_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pUser->superUser) {
|
||||||
|
terrno = TSDB_CODE_MND_NO_RIGHTS;
|
||||||
|
goto KILL_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTrans = mndAcquireTrans(pMnode, killReq.transId);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
|
||||||
|
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndKillTrans(pMnode, pTrans);
|
||||||
|
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
KILL_OVER:
|
||||||
|
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
|
mError("trans:%d, failed to kill since %s", killReq.transId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseTrans(pMnode, pTrans);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void mndTransPullup(SMnode *pMnode) {
|
void mndTransPullup(SMnode *pMnode) {
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -1099,7 +1205,12 @@ static int32_t mndRetrieveTrans(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
STR_TO_VARSTR(pWrite, pTrans->dbname);
|
char *name = mnGetDbStr(pTrans->dbname);
|
||||||
|
if (name != NULL) {
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]);
|
||||||
|
} else {
|
||||||
|
STR_TO_VARSTR(pWrite, "-");
|
||||||
|
}
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
|
|
@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
|
||||||
userObj.updateTime = userObj.createdTime;
|
userObj.updateTime = userObj.createdTime;
|
||||||
userObj.superUser = pCreate->superUser;
|
userObj.superUser = pCreate->superUser;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_CREATE_USER, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -350,7 +350,7 @@ CREATE_USER_OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) {
|
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SMnodeMsg *pReq) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("user:%s, failed to update since %s", pOld->user, terrstr());
|
mError("user:%s, failed to update since %s", pOld->user, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -511,7 +511,7 @@ ALTER_USER_OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) {
|
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pReq, SUserObj *pUser) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -256,6 +256,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev
|
||||||
// mnode-trans
|
// mnode-trans
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
||||||
|
|
||||||
// mnode-topic
|
// mnode-topic
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")
|
||||||
|
|
Loading…
Reference in New Issue