Merge pull request #13339 from taosdata/fix/tsim
fix: the max number of mnodes is limited to 3
This commit is contained in:
commit
321e23fe7b
|
@ -182,8 +182,9 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356)
|
||||
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
|
||||
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
|
||||
#define TSDB_CODE_MND_MNODE_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0359)
|
||||
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A)
|
||||
#define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0359)
|
||||
#define TSDB_CODE_MND_MNODE_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x035A)
|
||||
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035B)
|
||||
|
||||
// mnode-acct
|
||||
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
|
||||
|
|
|
@ -253,8 +253,7 @@ typedef enum ELogicConditionType {
|
|||
|
||||
#define TSDB_TRANS_STAGE_LEN 12
|
||||
#define TSDB_TRANS_TYPE_LEN 16
|
||||
#define TSDB_TRANS_ERROR_LEN 64
|
||||
#define TSDB_TRANS_DESC_LEN 128
|
||||
#define TSDB_TRANS_ERROR_LEN 512
|
||||
|
||||
#define TSDB_STEP_NAME_LEN 32
|
||||
#define TSDB_STEP_DESC_LEN 128
|
||||
|
|
|
@ -215,7 +215,6 @@ static const SSysDbTableSchema transSchema[] = {
|
|||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||
{.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||
|
|
|
@ -54,9 +54,11 @@ typedef enum {
|
|||
} EAuthOp;
|
||||
|
||||
typedef enum {
|
||||
TRN_STEP_LOG = 1,
|
||||
TRN_STEP_ACTION = 2,
|
||||
} ETrnStep;
|
||||
TRN_CONFLICT_NOTHING = 0,
|
||||
TRN_CONFLICT_GLOBAL = 1,
|
||||
TRN_CONFLICT_DB = 2,
|
||||
TRN_CONFLICT_DB_INSIDE = 3,
|
||||
} ETrnConflct;
|
||||
|
||||
typedef enum {
|
||||
TRN_STAGE_PREPARE = 0,
|
||||
|
@ -68,69 +70,15 @@ typedef enum {
|
|||
TRN_STAGE_FINISHED = 6
|
||||
} ETrnStage;
|
||||
|
||||
typedef enum {
|
||||
TRN_TYPE_BASIC_SCOPE = 1000,
|
||||
TRN_TYPE_CREATE_ACCT = 1001,
|
||||
TRN_TYPE_CREATE_CLUSTER = 1002,
|
||||
TRN_TYPE_CREATE_USER = 1003,
|
||||
TRN_TYPE_ALTER_USER = 1004,
|
||||
TRN_TYPE_DROP_USER = 1005,
|
||||
TRN_TYPE_CREATE_FUNC = 1006,
|
||||
TRN_TYPE_DROP_FUNC = 1007,
|
||||
|
||||
TRN_TYPE_CREATE_SNODE = 1010,
|
||||
TRN_TYPE_DROP_SNODE = 1011,
|
||||
TRN_TYPE_CREATE_QNODE = 1012,
|
||||
TRN_TYPE_DROP_QNODE = 10013,
|
||||
TRN_TYPE_CREATE_BNODE = 1014,
|
||||
TRN_TYPE_DROP_BNODE = 1015,
|
||||
TRN_TYPE_CREATE_MNODE = 1016,
|
||||
TRN_TYPE_DROP_MNODE = 1017,
|
||||
|
||||
TRN_TYPE_CREATE_TOPIC = 1020,
|
||||
TRN_TYPE_DROP_TOPIC = 1021,
|
||||
TRN_TYPE_SUBSCRIBE = 1022,
|
||||
TRN_TYPE_REBALANCE = 1023,
|
||||
TRN_TYPE_COMMIT_OFFSET = 1024,
|
||||
TRN_TYPE_CREATE_STREAM = 1025,
|
||||
TRN_TYPE_DROP_STREAM = 1026,
|
||||
TRN_TYPE_ALTER_STREAM = 1027,
|
||||
TRN_TYPE_CONSUMER_LOST = 1028,
|
||||
TRN_TYPE_CONSUMER_RECOVER = 1029,
|
||||
TRN_TYPE_DROP_CGROUP = 1030,
|
||||
TRN_TYPE_BASIC_SCOPE_END,
|
||||
|
||||
TRN_TYPE_GLOBAL_SCOPE = 2000,
|
||||
TRN_TYPE_CREATE_DNODE = 2001,
|
||||
TRN_TYPE_DROP_DNODE = 2002,
|
||||
TRN_TYPE_GLOBAL_SCOPE_END,
|
||||
|
||||
TRN_TYPE_DB_SCOPE = 3000,
|
||||
TRN_TYPE_CREATE_DB = 3001,
|
||||
TRN_TYPE_ALTER_DB = 3002,
|
||||
TRN_TYPE_DROP_DB = 3003,
|
||||
TRN_TYPE_SPLIT_VGROUP = 3004,
|
||||
TRN_TYPE_MERGE_VGROUP = 3015,
|
||||
TRN_TYPE_DB_SCOPE_END,
|
||||
|
||||
TRN_TYPE_STB_SCOPE = 4000,
|
||||
TRN_TYPE_CREATE_STB = 4001,
|
||||
TRN_TYPE_ALTER_STB = 4002,
|
||||
TRN_TYPE_DROP_STB = 4003,
|
||||
TRN_TYPE_CREATE_SMA = 4004,
|
||||
TRN_TYPE_DROP_SMA = 4005,
|
||||
TRN_TYPE_STB_SCOPE_END,
|
||||
} ETrnType;
|
||||
|
||||
typedef enum {
|
||||
TRN_POLICY_ROLLBACK = 0,
|
||||
TRN_POLICY_RETRY = 1,
|
||||
} ETrnPolicy;
|
||||
|
||||
typedef enum {
|
||||
TRN_EXEC_PARALLEL = 0,
|
||||
TRN_EXEC_NO_PARALLEL = 1,
|
||||
} ETrnExecType;
|
||||
TRN_EXEC_PRARLLEL = 0,
|
||||
TRN_EXEC_SERIAL = 1,
|
||||
} ETrnExec;
|
||||
|
||||
typedef enum {
|
||||
DND_REASON_ONLINE = 0,
|
||||
|
@ -159,8 +107,8 @@ typedef struct {
|
|||
int32_t id;
|
||||
ETrnStage stage;
|
||||
ETrnPolicy policy;
|
||||
ETrnType type;
|
||||
ETrnExecType parallel;
|
||||
ETrnConflct conflict;
|
||||
ETrnExec exec;
|
||||
int32_t code;
|
||||
int32_t failedTimes;
|
||||
SRpcHandleInfo rpcInfo;
|
||||
|
@ -172,10 +120,11 @@ typedef struct {
|
|||
SArray* commitActions;
|
||||
int64_t createdTime;
|
||||
int64_t lastExecTime;
|
||||
int64_t dbUid;
|
||||
int32_t lastErrorAction;
|
||||
int32_t lastErrorNo;
|
||||
tmsg_t lastErrorMsgType;
|
||||
SEpSet lastErrorEpset;
|
||||
char dbname[TSDB_DB_FNAME_LEN];
|
||||
char lastError[TSDB_TRANS_ERROR_LEN];
|
||||
char desc[TSDB_TRANS_DESC_LEN];
|
||||
int32_t startFunc;
|
||||
int32_t stopFunc;
|
||||
int32_t paramLen;
|
||||
|
|
|
@ -34,7 +34,7 @@ typedef struct {
|
|||
int32_t errCode;
|
||||
int32_t acceptableCode;
|
||||
int8_t stage;
|
||||
int8_t isRaw;
|
||||
int8_t actionType; // 0-msg, 1-raw
|
||||
int8_t rawWritten;
|
||||
int8_t msgSent;
|
||||
int8_t msgReceived;
|
||||
|
@ -52,7 +52,7 @@ void mndCleanupTrans(SMnode *pMnode);
|
|||
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId);
|
||||
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
|
||||
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq);
|
||||
void mndTransDrop(STrans *pTrans);
|
||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
|
||||
|
@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
|
|||
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
|
||||
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
|
||||
void mndTransSetNoParallel(STrans *pTrans);
|
||||
void mndTransSetSerial(STrans *pTrans);
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
|
||||
void mndTransProcessRsp(SRpcMsg *pRsp);
|
||||
|
|
|
@ -80,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
|||
|
||||
mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
|
||||
if (pTrans == NULL) {
|
||||
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -246,7 +246,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
|||
bnodeObj.createdTime = taosGetTimestampMs();
|
||||
bnodeObj.updateTime = bnodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
|
|||
static int32_t mndDropBnode(SMnode *pMnode, SRpcMsg *pReq, SBnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -179,10 +179,8 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
|
||||
#if 0
|
||||
return sdbWrite(pMnode->pSdb, pRaw);
|
||||
#else
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_CLUSTER, NULL);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
|
||||
if (pTrans == NULL) {
|
||||
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
|
||||
return -1;
|
||||
|
@ -204,7 +202,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveClusters(SRpcMsg *pMsg, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
|
|
|
@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
|
|||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_LOST, pMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
|
||||
if (pTrans == NULL) goto FAIL;
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||
|
@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
|||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CONSUMER_RECOVER, pMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
|
||||
if (pTrans == NULL) goto FAIL;
|
||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
|
||||
|
@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
|
||||
int32_t newTopicNum = taosArrayGetSize(newSub);
|
||||
// check topic existance
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, pMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg);
|
||||
if (pTrans == NULL) goto SUBSCRIBE_OVER;
|
||||
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
|
|
|
@ -545,7 +545,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||
|
@ -775,7 +775,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
|
||||
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_DB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
|
||||
|
@ -1036,7 +1036,7 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
|
|||
|
||||
static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_DB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
||||
|
|
|
@ -101,10 +101,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
|
|||
|
||||
mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw);
|
||||
|
||||
#if 0
|
||||
return sdbWrite(pMnode->pSdb, pRaw);
|
||||
#else
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
|
||||
return -1;
|
||||
|
@ -126,7 +123,6 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
|
||||
|
@ -488,7 +484,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
|
|||
memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN);
|
||||
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_DNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
|
||||
return -1;
|
||||
|
@ -564,7 +560,7 @@ CREATE_DNODE_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_DNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -215,7 +215,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
|
|||
}
|
||||
memcpy(func.pCode, pCreate->pCode, func.codeSize);
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_FUNC, pReq);
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
|
||||
|
@ -245,7 +245,7 @@ _OVER:
|
|||
|
||||
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_FUNC, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
|
||||
|
|
|
@ -18,9 +18,9 @@
|
|||
#include "mndAuth.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSync.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndSync.h"
|
||||
|
||||
#define MNODE_VER_NUMBER 1
|
||||
#define MNODE_RESERVE_SIZE 64
|
||||
|
@ -92,10 +92,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
|
|||
|
||||
mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
|
||||
|
||||
#if 0
|
||||
return sdbWrite(pMnode->pSdb, pRaw);
|
||||
#else
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, NULL);
|
||||
if (pTrans == NULL) {
|
||||
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
|
||||
return -1;
|
||||
|
@ -117,7 +114,6 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
|
||||
|
@ -363,11 +359,11 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
|||
mnodeObj.createdTime = taosGetTimestampMs();
|
||||
mnodeObj.updateTime = mnodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
mndTransSetNoParallel(pTrans);
|
||||
mndTransSetSerial(pTrans);
|
||||
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
||||
|
@ -396,6 +392,11 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
|||
|
||||
mDebug("mnode:%d, start to create", createReq.dnodeId);
|
||||
|
||||
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_MNODES;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
|
||||
if (pObj != NULL) {
|
||||
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
|
||||
|
@ -535,11 +536,11 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
|||
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||
mndTransSetNoParallel(pTrans);
|
||||
mndTransSetSerial(pTrans);
|
||||
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
|
||||
|
|
|
@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
|
|||
|
||||
tDecodeSMqCMCommitOffsetReq(&decoder, &commitOffsetReq);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_COMMIT_OFFSET, pMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
|
||||
|
||||
for (int32_t i = 0; i < commitOffsetReq.num; i++) {
|
||||
SMqOffset *pOffset = &commitOffsetReq.offsets[i];
|
||||
|
|
|
@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
|||
qnodeObj.createdTime = taosGetTimestampMs();
|
||||
qnodeObj.updateTime = qnodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_QNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -365,7 +365,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
|
|||
static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_QNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -508,12 +508,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
streamObj.fixedSinkVgId = smaObj.dstVgId;
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
||||
mndTransSetDbInfo(pTrans, pDb);
|
||||
mndTransSetNoParallel(pTrans);
|
||||
mndTransSetSerial(pTrans);
|
||||
|
||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||
|
@ -753,7 +753,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId);
|
||||
if (pVgroup == NULL) goto _OVER;
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_SMA, pReq);
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
||||
|
|
|
@ -253,7 +253,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
|||
snodeObj.createdTime = taosGetTimestampMs();
|
||||
snodeObj.updateTime = snodeObj.createdTime;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
|
||||
|
@ -372,7 +372,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
|
|||
static int32_t mndDropSnode(SMnode *pMnode, SRpcMsg *pReq, SSnodeObj *pObj) {
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
|
||||
|
|
|
@ -735,7 +735,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
|
|||
|
||||
int32_t code = -1;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
|
||||
|
@ -1257,7 +1257,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
if (code != 0) goto _OVER;
|
||||
|
||||
code = -1;
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_ALTER_STB, pReq);
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||
|
@ -1403,7 +1403,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
|
||||
static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_STB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
|
||||
|
|
|
@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
|
|||
tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("stream:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -394,8 +394,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("rebalance calculation completed, rebalanced vg:");
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
|
||||
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
|
||||
mInfo("vg: %d moved from consumer %ld to consumer %ld", pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId,
|
||||
pOutputRebVg->newConsumerId);
|
||||
mInfo("vgId:%d moved from consumer %" PRId64 " to consumer %" PRId64, pOutputRebVg->pVgEp->vgId,
|
||||
pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||
}
|
||||
|
||||
// 9. clear
|
||||
|
@ -405,10 +405,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
|
||||
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, pMsg);
|
||||
if (pTrans == NULL) {
|
||||
return -1;
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg);
|
||||
if (pTrans == NULL) return -1;
|
||||
|
||||
// make txn:
|
||||
// 1. redo action: action to all vg
|
||||
const SArray *rebVgs = pOutput->rebVgs;
|
||||
|
@ -625,7 +624,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_CGROUP, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
|
|
|
@ -383,7 +383,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
/*topicObj.withSchema = 1;*/
|
||||
}
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_TOPIC, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
taosMemoryFreeClear(topicObj.ast);
|
||||
|
@ -551,7 +551,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
}
|
||||
#endif
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -88,12 +88,12 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
|
|||
|
||||
for (int32_t i = 0; i < actionNum; ++i) {
|
||||
STransAction *pAction = taosArrayGet(pArray, i);
|
||||
if (pAction->isRaw) {
|
||||
if (pAction->actionType) {
|
||||
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
|
||||
} else {
|
||||
rawDataLen += (sizeof(STransAction) + pAction->contLen);
|
||||
}
|
||||
rawDataLen += sizeof(pAction->isRaw);
|
||||
rawDataLen += sizeof(pAction->actionType);
|
||||
}
|
||||
|
||||
return rawDataLen;
|
||||
|
@ -117,8 +117,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->conflict, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->exec, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
|
||||
|
@ -135,9 +135,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||
if (pAction->isRaw) {
|
||||
if (pAction->actionType) {
|
||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||
|
@ -157,9 +157,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||
if (pAction->isRaw) {
|
||||
if (pAction->actionType) {
|
||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||
|
@ -179,9 +179,9 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
|
||||
if (pAction->isRaw) {
|
||||
if (pAction->actionType) {
|
||||
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
|
||||
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||
|
@ -250,16 +250,16 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
int16_t stage = 0;
|
||||
int16_t policy = 0;
|
||||
int16_t type = 0;
|
||||
int16_t parallel = 0;
|
||||
int16_t conflict = 0;
|
||||
int16_t exec = 0;
|
||||
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &type, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, ¶llel, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &conflict, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &exec, _OVER)
|
||||
pTrans->stage = stage;
|
||||
pTrans->policy = policy;
|
||||
pTrans->type = type;
|
||||
pTrans->parallel = parallel;
|
||||
pTrans->conflict = conflict;
|
||||
pTrans->exec = exec;
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
|
||||
|
@ -279,9 +279,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
||||
if (action.isRaw) {
|
||||
if (action.actionType) {
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
action.pRaw = taosMemoryMalloc(dataLen);
|
||||
|
@ -308,9 +308,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
||||
if (action.isRaw) {
|
||||
if (action.actionType) {
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
action.pRaw = taosMemoryMalloc(dataLen);
|
||||
|
@ -337,9 +337,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
|
||||
if (action.isRaw) {
|
||||
if (action.actionType) {
|
||||
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
|
||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||
action.pRaw = taosMemoryMalloc(dataLen);
|
||||
|
@ -408,81 +408,6 @@ static const char *mndTransStr(ETrnStage stage) {
|
|||
}
|
||||
}
|
||||
|
||||
static const char *mndTransType(ETrnType type) {
|
||||
switch (type) {
|
||||
case TRN_TYPE_CREATE_USER:
|
||||
return "create-user";
|
||||
case TRN_TYPE_ALTER_USER:
|
||||
return "alter-user";
|
||||
case TRN_TYPE_DROP_USER:
|
||||
return "drop-user";
|
||||
case TRN_TYPE_CREATE_FUNC:
|
||||
return "create-func";
|
||||
case TRN_TYPE_DROP_FUNC:
|
||||
return "drop-func";
|
||||
case TRN_TYPE_CREATE_SNODE:
|
||||
return "create-snode";
|
||||
case TRN_TYPE_DROP_SNODE:
|
||||
return "drop-snode";
|
||||
case TRN_TYPE_CREATE_QNODE:
|
||||
return "create-qnode";
|
||||
case TRN_TYPE_DROP_QNODE:
|
||||
return "drop-qnode";
|
||||
case TRN_TYPE_CREATE_BNODE:
|
||||
return "create-bnode";
|
||||
case TRN_TYPE_DROP_BNODE:
|
||||
return "drop-bnode";
|
||||
case TRN_TYPE_CREATE_MNODE:
|
||||
return "create-mnode";
|
||||
case TRN_TYPE_DROP_MNODE:
|
||||
return "drop-mnode";
|
||||
case TRN_TYPE_CREATE_TOPIC:
|
||||
return "create-topic";
|
||||
case TRN_TYPE_DROP_TOPIC:
|
||||
return "drop-topic";
|
||||
case TRN_TYPE_SUBSCRIBE:
|
||||
return "subscribe";
|
||||
case TRN_TYPE_REBALANCE:
|
||||
return "rebalance";
|
||||
case TRN_TYPE_COMMIT_OFFSET:
|
||||
return "commit-offset";
|
||||
case TRN_TYPE_CREATE_STREAM:
|
||||
return "create-stream";
|
||||
case TRN_TYPE_DROP_STREAM:
|
||||
return "drop-stream";
|
||||
case TRN_TYPE_CONSUMER_LOST:
|
||||
return "consumer-lost";
|
||||
case TRN_TYPE_CONSUMER_RECOVER:
|
||||
return "consumer-recover";
|
||||
case TRN_TYPE_CREATE_DNODE:
|
||||
return "create-qnode";
|
||||
case TRN_TYPE_DROP_DNODE:
|
||||
return "drop-qnode";
|
||||
case TRN_TYPE_CREATE_DB:
|
||||
return "create-db";
|
||||
case TRN_TYPE_ALTER_DB:
|
||||
return "alter-db";
|
||||
case TRN_TYPE_DROP_DB:
|
||||
return "drop-db";
|
||||
case TRN_TYPE_SPLIT_VGROUP:
|
||||
return "split-vgroup";
|
||||
case TRN_TYPE_MERGE_VGROUP:
|
||||
return "merge-vgroup";
|
||||
case TRN_TYPE_CREATE_STB:
|
||||
return "create-stb";
|
||||
case TRN_TYPE_ALTER_STB:
|
||||
return "alter-stb";
|
||||
case TRN_TYPE_DROP_STB:
|
||||
return "drop-stb";
|
||||
case TRN_TYPE_CREATE_SMA:
|
||||
return "create-sma";
|
||||
case TRN_TYPE_DROP_SMA:
|
||||
return "drop-sma";
|
||||
default:
|
||||
return "invalid";
|
||||
}
|
||||
}
|
||||
|
||||
static void mndTransTestStartFunc(SMnode *pMnode, void *param, int32_t paramLen) {
|
||||
mInfo("test trans start, param:%s, len:%d", (char *)param, paramLen);
|
||||
}
|
||||
|
@ -594,7 +519,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
|
|||
sdbRelease(pSdb, pTrans);
|
||||
}
|
||||
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq) {
|
||||
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq) {
|
||||
STrans *pTrans = taosMemoryCalloc(1, sizeof(STrans));
|
||||
if (pTrans == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -605,8 +530,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
|
|||
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
|
||||
pTrans->stage = TRN_STAGE_PREPARE;
|
||||
pTrans->policy = policy;
|
||||
pTrans->type = type;
|
||||
pTrans->parallel = TRN_EXEC_PARALLEL;
|
||||
pTrans->conflict = conflict;
|
||||
pTrans->exec = TRN_EXEC_PRARLLEL;
|
||||
pTrans->createdTime = taosGetTimestampMs();
|
||||
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||
|
@ -627,7 +552,7 @@ static void mndTransDropActions(SArray *pArray) {
|
|||
int32_t size = taosArrayGetSize(pArray);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STransAction *pAction = taosArrayGet(pArray, i);
|
||||
if (pAction->isRaw) {
|
||||
if (pAction->actionType) {
|
||||
taosMemoryFreeClear(pAction->pRaw);
|
||||
} else {
|
||||
taosMemoryFreeClear(pAction->pCont);
|
||||
|
@ -658,17 +583,17 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
|
|||
}
|
||||
|
||||
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .isRaw = true, .pRaw = pRaw};
|
||||
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = true, .pRaw = pRaw};
|
||||
return mndTransAppendAction(pTrans->redoActions, &action);
|
||||
}
|
||||
|
||||
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .isRaw = true, .pRaw = pRaw};
|
||||
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = true, .pRaw = pRaw};
|
||||
return mndTransAppendAction(pTrans->undoActions, &action);
|
||||
}
|
||||
|
||||
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
|
||||
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .isRaw = true, .pRaw = pRaw};
|
||||
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = true, .pRaw = pRaw};
|
||||
return mndTransAppendAction(pTrans->commitActions, &action);
|
||||
}
|
||||
|
||||
|
@ -698,7 +623,7 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
|
|||
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
void mndTransSetNoParallel(STrans *pTrans) { pTrans->parallel = TRN_EXEC_NO_PARALLEL; }
|
||||
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
|
||||
|
||||
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
|
||||
|
@ -721,76 +646,43 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static bool mndIsBasicTrans(STrans *pTrans) {
|
||||
return pTrans->type > TRN_TYPE_BASIC_SCOPE && pTrans->type < TRN_TYPE_BASIC_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsGlobalTrans(STrans *pTrans) {
|
||||
return pTrans->type > TRN_TYPE_GLOBAL_SCOPE && pTrans->type < TRN_TYPE_GLOBAL_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsDbTrans(STrans *pTrans) {
|
||||
return pTrans->type > TRN_TYPE_DB_SCOPE && pTrans->type < TRN_TYPE_DB_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsStbTrans(STrans *pTrans) {
|
||||
return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
|
||||
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
||||
STrans *pTrans = NULL;
|
||||
void *pIter = NULL;
|
||||
bool conflict = false;
|
||||
|
||||
if (mndIsBasicTrans(pNewTrans)) return conflict;
|
||||
if (pNew->conflict == TRN_CONFLICT_NOTHING) return conflict;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (mndIsGlobalTrans(pNewTrans)) {
|
||||
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
conflict = true;
|
||||
} else {
|
||||
}
|
||||
if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pNew->conflict == TRN_CONFLICT_DB) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
}
|
||||
|
||||
else if (mndIsDbTrans(pNewTrans)) {
|
||||
if (mndIsGlobalTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
|
||||
conflict = true;
|
||||
} else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
|
||||
if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
conflict = true;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
}
|
||||
|
||||
else if (mndIsStbTrans(pNewTrans)) {
|
||||
if (mndIsGlobalTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
|
||||
conflict = true;
|
||||
} else if (mndIsDbTrans(pTrans)) {
|
||||
if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
conflict = true;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
mError("trans:%d, can't execute since conflict with trans:%d, db:%s", pNew->id, pTrans->id, pTrans->dbname);
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
}
|
||||
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
return conflict;
|
||||
}
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (strlen(pTrans->dbname) == 0) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckTransConflict(pMnode, pTrans)) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
|
@ -921,9 +813,6 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
|
|||
if (pAction != NULL) {
|
||||
pAction->msgReceived = 1;
|
||||
pAction->errCode = pRsp->code;
|
||||
if (pAction->errCode != 0) {
|
||||
tstrncpy(pTrans->lastError, tstrerror(pAction->errCode), TSDB_TRANS_ERROR_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action,
|
||||
|
@ -1004,7 +893,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
|
|||
}
|
||||
|
||||
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
|
||||
if (pAction->isRaw) {
|
||||
if (pAction->actionType) {
|
||||
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
|
||||
} else {
|
||||
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
|
||||
|
@ -1032,24 +921,36 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfExecuted = 0;
|
||||
int32_t errCode = 0;
|
||||
int32_t numOfExecuted = 0;
|
||||
int32_t errCode = 0;
|
||||
STransAction *pErrAction = NULL;
|
||||
for (int32_t action = 0; action < numOfActions; ++action) {
|
||||
STransAction *pAction = taosArrayGet(pArray, action);
|
||||
if (pAction->msgReceived || pAction->rawWritten) {
|
||||
numOfExecuted++;
|
||||
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
|
||||
errCode = pAction->errCode;
|
||||
pErrAction = pAction;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfExecuted == numOfActions) {
|
||||
if (errCode == 0) {
|
||||
pTrans->lastErrorAction = 0;
|
||||
pTrans->lastErrorNo = 0;
|
||||
pTrans->lastErrorMsgType = 0;
|
||||
memset(&pTrans->lastErrorEpset, 0, sizeof(pTrans->lastErrorEpset));
|
||||
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
|
||||
return 0;
|
||||
} else {
|
||||
mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode & 0XFFFF);
|
||||
if (pErrAction != NULL) {
|
||||
pTrans->lastErrorMsgType = pErrAction->msgType;
|
||||
pTrans->lastErrorAction = pErrAction->id;
|
||||
pTrans->lastErrorNo = pErrAction->errCode;
|
||||
pTrans->lastErrorEpset = pErrAction->epSet;
|
||||
}
|
||||
mndTransResetActions(pMnode, pTrans, pArray);
|
||||
terrno = errCode;
|
||||
return errCode;
|
||||
|
@ -1084,7 +985,7 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTrans) {
|
||||
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) {
|
||||
int32_t code = 0;
|
||||
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
|
||||
if (numOfActions == 0) return code;
|
||||
|
@ -1111,6 +1012,18 @@ static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTra
|
|||
}
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
pTrans->lastErrorAction = 0;
|
||||
pTrans->lastErrorNo = 0;
|
||||
pTrans->lastErrorMsgType = 0;
|
||||
memset(&pTrans->lastErrorEpset, 0, sizeof(pTrans->lastErrorEpset));
|
||||
} else {
|
||||
pTrans->lastErrorMsgType = pAction->msgType;
|
||||
pTrans->lastErrorAction = action;
|
||||
pTrans->lastErrorNo = pAction->errCode;
|
||||
pTrans->lastErrorEpset = pAction->epSet;
|
||||
}
|
||||
|
||||
if (code == 0) {
|
||||
pTrans->redoActionPos++;
|
||||
mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
|
||||
|
@ -1144,8 +1057,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
|||
bool continueExec = true;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pTrans->parallel == TRN_EXEC_NO_PARALLEL) {
|
||||
code = mndTransExecuteRedoActionsNoParallel(pMnode, pTrans);
|
||||
if (pTrans->exec == TRN_EXEC_SERIAL) {
|
||||
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans);
|
||||
} else {
|
||||
code = mndTransExecuteRedoActions(pMnode, pTrans);
|
||||
}
|
||||
|
@ -1455,11 +1368,6 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
|
||||
|
||||
char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(type, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)type, false);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
|
||||
|
||||
|
@ -1467,7 +1375,20 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false);
|
||||
|
||||
char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(lastError, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes);
|
||||
char detail[TSDB_TRANS_ERROR_LEN] = {0};
|
||||
if (pTrans->lastErrorNo != 0) {
|
||||
int32_t len = snprintf(detail, sizeof(detail), "action:%d errno:0x%x(%s) ", pTrans->lastErrorAction,
|
||||
pTrans->lastErrorNo & 0xFFFF, tstrerror(pTrans->lastErrorNo));
|
||||
SEpSet epset = pTrans->lastErrorEpset;
|
||||
if (epset.numOfEps > 0) {
|
||||
len += snprintf(detail + len, sizeof(detail) - len, "msgType:%s numOfEps:%d inUse:%d ",
|
||||
TMSG_INFO(pTrans->lastErrorMsgType), epset.numOfEps, epset.inUse);
|
||||
}
|
||||
for (int32_t i = 0; i < pTrans->lastErrorEpset.numOfEps; ++i) {
|
||||
len += snprintf(detail + len, sizeof(detail) - len, "ep:%d-%s:%u ", i, epset.eps[i].fqdn, epset.eps[i].port);
|
||||
}
|
||||
}
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(lastError, detail, pShow->pMeta->pSchemas[cols].bytes);
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)lastError, false);
|
||||
|
||||
|
|
|
@ -79,10 +79,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
|
|||
|
||||
mDebug("user:%s, will be created when deploying, raw:%p", userObj.user, pRaw);
|
||||
|
||||
#if 0
|
||||
return sdbWrite(pMnode->pSdb, pRaw);
|
||||
#else
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_USER, NULL);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to create since %s", userObj.user, terrstr());
|
||||
return -1;
|
||||
|
@ -104,7 +101,6 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
|
||||
|
@ -291,7 +287,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
|
|||
userObj.updateTime = userObj.createdTime;
|
||||
userObj.superUser = pCreate->superUser;
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
|
||||
return -1;
|
||||
|
@ -371,7 +367,7 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndAlterUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SRpcMsg *pReq) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to alter since %s", pOld->user, terrstr());
|
||||
return -1;
|
||||
|
@ -578,7 +574,7 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndDropUser(SMnode *pMnode, SRpcMsg *pReq, SUserObj *pUser) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_USER, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq);
|
||||
if (pTrans == NULL) {
|
||||
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
||||
return -1;
|
||||
|
|
|
@ -11,6 +11,8 @@
|
|||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#if 0
|
||||
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "tcache.h"
|
||||
|
@ -103,7 +105,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
void SetUp() override {}
|
||||
void TearDown() override {}
|
||||
|
||||
int32_t CreateUserLog(const char *acct, const char *user, ETrnType type, SDbObj *pDb) {
|
||||
int32_t CreateUserLog(const char *acct, const char *user, ETrnConflct conflict, SDbObj *pDb) {
|
||||
SUserObj userObj = {0};
|
||||
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
||||
tstrncpy(userObj.user, user, TSDB_USER_LEN);
|
||||
|
@ -113,7 +115,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
userObj.superUser = 1;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, type, &rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, conflict, &rpcMsg);
|
||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||
mndTransAppendRedolog(pTrans, pRedoRaw);
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
|
@ -135,7 +137,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnType type,
|
||||
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrnConflct conflict,
|
||||
SDbObj *pDb) {
|
||||
SUserObj userObj = {0};
|
||||
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
|
||||
|
@ -146,7 +148,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
userObj.superUser = 1;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
STrans *pTrans = mndTransCreate(pMnode, policy, type, &rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, policy, conflict, &rpcMsg);
|
||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||
mndTransAppendRedolog(pTrans, pRedoRaw);
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
|
@ -218,7 +220,7 @@ class MndTestTrans2 : public ::testing::Test {
|
|||
userObj.superUser = 1;
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &rpcMsg);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, &rpcMsg);
|
||||
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
|
||||
mndTransAppendRedolog(pTrans, pRedoRaw);
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
|
@ -528,3 +530,5 @@ TEST_F(MndTestTrans2, 04_Conflict) {
|
|||
mndReleaseUser(pMnode, pUser);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
|
@ -187,6 +187,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_ALREADY_EXIST, "Bnode already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_NOT_EXIST, "Bnode not there")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "Too few mnodes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "Too many mnodes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_DEPLOYED, "Mnode deployed in this dnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CANT_DROP_MASTER, "Can't drop mnode which is master")
|
||||
|
||||
|
|
|
@ -57,7 +57,6 @@
|
|||
# ---- mnode
|
||||
./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic2.sim
|
||||
./test.sh -f tsim/mnode/basic3.sim
|
||||
|
||||
# ---- show
|
||||
./test.sh -f tsim/show/basic.sim
|
||||
|
|
|
@ -2,14 +2,17 @@ system sh/stop_dnodes.sh
|
|||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
system sh/deploy.sh -n dnode4 -i 4
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
system sh/exec.sh -n dnode4 -s start
|
||||
sql connect
|
||||
|
||||
print =============== step1: create dnodes
|
||||
sql create dnode $hostname port 7200
|
||||
sql create dnode $hostname port 7300
|
||||
sql create dnode $hostname port 7400
|
||||
|
||||
$x = 0
|
||||
step1:
|
||||
|
@ -32,6 +35,7 @@ endi
|
|||
print =============== step2: create mnode 2
|
||||
sql create mnode on dnode 2
|
||||
sql create mnode on dnode 3
|
||||
sql_error create mnode on dnode 4
|
||||
|
||||
$x = 0
|
||||
step2:
|
||||
|
@ -134,4 +138,5 @@ endi
|
|||
|
||||
system sh/exec.sh -n dnode1 -s stop
|
||||
system sh/exec.sh -n dnode2 -s stop
|
||||
system sh/exec.sh -n dnode3 -s stop
|
||||
system sh/exec.sh -n dnode3 -s stop
|
||||
system sh/exec.sh -n dnode4 -s stop
|
|
@ -76,14 +76,6 @@ if $data[0][3] != d1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][4] != create-db then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][7] != @Unable to establish connection@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error create database d1 vgroups 2;
|
||||
|
||||
print =============== start dnode2
|
||||
|
@ -125,15 +117,7 @@ endi
|
|||
if $data[0][3] != d2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][4] != create-db then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][7] != @Unable to establish connection@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
sql_error create database d2 vgroups 2;
|
||||
|
||||
print =============== kill transaction
|
||||
|
|
|
@ -279,9 +279,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
|
|||
tjsonAddIntegerToObject(item, "id", pObj->id);
|
||||
tjsonAddIntegerToObject(item, "stage", pObj->stage);
|
||||
tjsonAddIntegerToObject(item, "policy", pObj->policy);
|
||||
tjsonAddIntegerToObject(item, "type", pObj->type);
|
||||
tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
|
||||
tjsonAddIntegerToObject(item, "exec", pObj->exec);
|
||||
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
|
||||
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
|
||||
tjsonAddStringToObject(item, "dbname", pObj->dbname);
|
||||
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
|
||||
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
|
||||
|
|
Loading…
Reference in New Issue