Merge pull request #21188 from taosdata/FIX/TD-18702-3.0
feat: process split vgroup msg imp
This commit is contained in:
commit
510dbc2429
|
@ -60,7 +60,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
if (IsReq(pMsg)) {
|
if (IsReq(pMsg)) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
dGError("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(code), TMSG_INFO(pMsg->msgType));
|
dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType));
|
||||||
}
|
}
|
||||||
vmSendRsp(pMsg, code);
|
vmSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ typedef enum {
|
||||||
} ETrnPolicy;
|
} ETrnPolicy;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TRN_EXEC_PRARLLEL = 0,
|
TRN_EXEC_PARALLEL = 0,
|
||||||
TRN_EXEC_SERIAL = 1,
|
TRN_EXEC_SERIAL = 1,
|
||||||
} ETrnExec;
|
} ETrnExec;
|
||||||
|
|
||||||
|
|
|
@ -76,6 +76,7 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
|
||||||
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
|
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
|
||||||
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
|
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
|
||||||
void mndTransSetSerial(STrans *pTrans);
|
void mndTransSetSerial(STrans *pTrans);
|
||||||
|
void mndTransSetParallel(STrans *pTrans);
|
||||||
void mndTransSetOper(STrans *pTrans, EOperType oper);
|
void mndTransSetOper(STrans *pTrans, EOperType oper);
|
||||||
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans);
|
int32_t mndTrancCheckConflict(SMnode *pMnode, STrans *pTrans);
|
||||||
|
|
||||||
|
|
|
@ -643,7 +643,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
||||||
pTrans->stage = TRN_STAGE_PREPARE;
|
pTrans->stage = TRN_STAGE_PREPARE;
|
||||||
pTrans->policy = policy;
|
pTrans->policy = policy;
|
||||||
pTrans->conflict = conflict;
|
pTrans->conflict = conflict;
|
||||||
pTrans->exec = TRN_EXEC_PRARLLEL;
|
pTrans->exec = TRN_EXEC_PARALLEL;
|
||||||
pTrans->createdTime = taosGetTimestampMs();
|
pTrans->createdTime = taosGetTimestampMs();
|
||||||
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||||
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
|
||||||
|
@ -793,6 +793,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
|
||||||
|
|
||||||
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
|
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
|
||||||
|
|
||||||
|
void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
|
||||||
|
|
||||||
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
|
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
|
||||||
|
|
||||||
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
|
|
@ -2117,6 +2117,18 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) {
|
||||||
|
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
|
||||||
|
if (pRaw == NULL) goto _err;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err;
|
||||||
|
(void)sdbSetRawStatus(pRaw, vgStatus);
|
||||||
|
pRaw = NULL;
|
||||||
|
return 0;
|
||||||
|
_err:
|
||||||
|
sdbFreeRaw(pRaw);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
|
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = NULL;
|
STrans *pTrans = NULL;
|
||||||
|
@ -2195,28 +2207,16 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
||||||
if (pDb->cfg.replications != newVg1.replica) {
|
if (pDb->cfg.replications != newVg1.replica) {
|
||||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
pRaw = mndVgroupActionEncode(&newVg1);
|
if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER;
|
||||||
if (pRaw == NULL) goto _OVER;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
|
||||||
pRaw = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDb->cfg.replications != newVg2.replica) {
|
if (pDb->cfg.replications != newVg2.replica) {
|
||||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
pRaw = mndVgroupActionEncode(&newVg2);
|
if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER;
|
||||||
if (pRaw == NULL) goto _OVER;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
|
||||||
pRaw = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pRaw = mndVgroupActionEncode(pVgroup);
|
if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER;
|
||||||
if (pRaw == NULL) goto _OVER;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
|
|
||||||
pRaw = NULL;
|
|
||||||
|
|
||||||
memcpy(&dbObj, pDb, sizeof(SDbObj));
|
memcpy(&dbObj, pDb, sizeof(SDbObj));
|
||||||
if (dbObj.cfg.pRetensions != NULL) {
|
if (dbObj.cfg.pRetensions != NULL) {
|
||||||
|
@ -2243,42 +2243,13 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
|
extern int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq);
|
||||||
SMnode *pMnode = pReq->info.node;
|
|
||||||
int32_t code = -1;
|
|
||||||
SVgObj *pVgroup = NULL;
|
|
||||||
SDbObj *pDb = NULL;
|
|
||||||
|
|
||||||
SSplitVgroupReq req = {0};
|
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return mndProcessSplitVgroupMsgImp(pReq); }
|
||||||
if (tDeserializeSSplitVgroupReq(pReq->pCont, pReq->contLen, &req) != 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("vgId:%d, start to split", req.vgId);
|
#ifndef TD_ENTERPRISE
|
||||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) {
|
int32_t mndProcessSplitVgroupMsgImp(SRpcMsg *pReq) { return 0; }
|
||||||
goto _OVER;
|
#endif
|
||||||
}
|
|
||||||
|
|
||||||
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
|
|
||||||
if (pVgroup == NULL) goto _OVER;
|
|
||||||
|
|
||||||
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
|
||||||
if (pDb == NULL) goto _OVER;
|
|
||||||
|
|
||||||
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
|
|
||||||
if (code != 0) {
|
|
||||||
mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name);
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name);
|
|
||||||
|
|
||||||
_OVER:
|
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||||
SDnodeObj *pSrc, SDnodeObj *pDst) {
|
SDnodeObj *pSrc, SDnodeObj *pDst) {
|
||||||
|
|
Loading…
Reference in New Issue