Merge pull request #13517 from taosdata/fix/mnode
feat: redistribute vgroup
This commit is contained in:
commit
dd8ef45aae
|
@ -218,6 +218,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390)
|
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390)
|
||||||
#define TSDB_CODE_MND_VGROUP_NOT_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0391)
|
#define TSDB_CODE_MND_VGROUP_NOT_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0391)
|
||||||
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392)
|
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392)
|
||||||
|
#define TSDB_CODE_MND_VGROUP_UN_CHANGED TAOS_DEF_ERROR_CODE(0, 0x0393)
|
||||||
|
|
||||||
// mnode-stable
|
// mnode-stable
|
||||||
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
|
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
|
||||||
|
|
|
@ -28,7 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
|
||||||
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
|
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
|
||||||
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
|
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
|
||||||
int32_t mndGetDnodeSize(SMnode *pMnode);
|
int32_t mndGetDnodeSize(SMnode *pMnode);
|
||||||
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs);
|
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId);
|
||||||
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj);
|
void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj);
|
||||||
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
|
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
|
||||||
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
|
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
|
||||||
|
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,15 +30,21 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
|
||||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
|
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
|
||||||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
||||||
|
|
||||||
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
|
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
|
||||||
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
SArray *mndBuildDnodesArray(SMnode *pMnode);
|
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
|
||||||
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
|
int32_t mndAddVnodeToVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray);
|
||||||
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pVgId);
|
int32_t mndRemoveVnodeFromVgroup(SMnode *, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid);
|
||||||
|
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby);
|
||||||
|
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
|
||||||
|
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
|
||||||
|
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
||||||
|
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
|
||||||
|
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby);
|
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
|
||||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildAlterVnodeReq(SMnode *, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -299,7 +299,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -409,7 +409,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -263,111 +263,6 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
|
||||||
bool standby) {
|
|
||||||
STransAction action = {0};
|
|
||||||
|
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
|
||||||
if (pDnode == NULL) return -1;
|
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
|
|
||||||
int32_t contLen = 0;
|
|
||||||
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby);
|
|
||||||
if (pReq == NULL) return -1;
|
|
||||||
|
|
||||||
action.pCont = pReq;
|
|
||||||
action.contLen = contLen;
|
|
||||||
action.msgType = TDMT_DND_CREATE_VNODE;
|
|
||||||
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
|
||||||
STransAction action = {0};
|
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
|
|
||||||
int32_t contLen = sizeof(SMsgHead);
|
|
||||||
SMsgHead *pHead = taosMemoryMalloc(contLen);
|
|
||||||
if (pHead == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pHead->contLen = htonl(contLen);
|
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
|
||||||
|
|
||||||
action.pCont = pHead;
|
|
||||||
action.contLen = contLen;
|
|
||||||
action.msgType = TDMT_VND_ALTER_CONFIRM;
|
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pHead);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
|
|
||||||
STransAction action = {0};
|
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
|
||||||
|
|
||||||
int32_t contLen = 0;
|
|
||||||
void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen);
|
|
||||||
if (pReq == NULL) return -1;
|
|
||||||
|
|
||||||
action.pCont = pReq;
|
|
||||||
action.contLen = contLen;
|
|
||||||
action.msgType = msgType;
|
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
|
||||||
bool isRedo) {
|
|
||||||
STransAction action = {0};
|
|
||||||
|
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
|
||||||
if (pDnode == NULL) return -1;
|
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
|
|
||||||
int32_t contLen = 0;
|
|
||||||
void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
|
||||||
if (pReq == NULL) return -1;
|
|
||||||
|
|
||||||
action.pCont = pReq;
|
|
||||||
action.contLen = contLen;
|
|
||||||
action.msgType = TDMT_DND_DROP_VNODE;
|
|
||||||
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
|
|
||||||
|
|
||||||
if (isRedo) {
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
|
static int32_t mndCheckDbName(const char *dbName, SUserObj *pUser) {
|
||||||
char *pos = strstr(dbName, TS_PATH_DELIMITER);
|
char *pos = strstr(dbName, TS_PATH_DELIMITER);
|
||||||
if (pos == NULL) {
|
if (pos == NULL) {
|
||||||
|
@ -795,7 +690,7 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SArray *pArray = mndBuildDnodesArray(pMnode);
|
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
|
@ -1742,3 +1637,4 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -254,7 +254,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
|
||||||
return sdbGetSize(pSdb, SDB_DNODE);
|
return sdbGetSize(pSdb, SDB_DNODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
|
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
|
||||||
int64_t interval = TABS(pDnode->lastAccessTime - curMs);
|
int64_t interval = TABS(pDnode->lastAccessTime - curMs);
|
||||||
if (interval > 5000 * tsStatusInterval) {
|
if (interval > 5000 * tsStatusInterval) {
|
||||||
if (pDnode->rebootTime > 0) {
|
if (pDnode->rebootTime > 0) {
|
||||||
|
@ -393,7 +393,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
|
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
|
||||||
int64_t curMs = taosGetTimestampMs();
|
int64_t curMs = taosGetTimestampMs();
|
||||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||||
bool dnodeChanged = (statusReq.dnodeVer != dnodeVer);
|
bool dnodeChanged = (statusReq.dnodeVer != dnodeVer);
|
||||||
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
|
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
|
||||||
bool needCheck = !online || dnodeChanged || reboot;
|
bool needCheck = !online || dnodeChanged || reboot;
|
||||||
|
@ -542,7 +542,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
|
||||||
goto CREATE_DNODE_OVER;
|
goto CREATE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto CREATE_DNODE_OVER;
|
goto CREATE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -559,30 +559,36 @@ CREATE_DNODE_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode) {
|
static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj) {
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_GLOBAL, pReq);
|
int32_t code = -1;
|
||||||
if (pTrans == NULL) {
|
SSdbRaw *pRaw = NULL;
|
||||||
mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr());
|
STrans *pTrans = NULL;
|
||||||
return -1;
|
|
||||||
}
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||||
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
|
mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndDnodeActionEncode(pDnode);
|
pRaw = mndDnodeActionEncode(pDnode);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
if (pRaw == NULL || mndTransAppendRedolog(pTrans, pRaw) != 0) goto _OVER;
|
||||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING);
|
||||||
mndTransDrop(pTrans);
|
pRaw = NULL;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
pRaw = mndDnodeActionEncode(pDnode);
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||||
mndTransDrop(pTrans);
|
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
|
||||||
return -1;
|
pRaw = NULL;
|
||||||
}
|
|
||||||
|
|
||||||
|
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pMObj) != 0) goto _OVER;
|
||||||
|
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id) != 0) goto _OVER;
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
sdbFreeRaw(pRaw);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
||||||
|
@ -595,42 +601,53 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto DROP_DNODE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("dnode:%d, start to drop", dropReq.dnodeId);
|
mDebug("dnode:%d, start to drop", dropReq.dnodeId);
|
||||||
|
|
||||||
if (dropReq.dnodeId <= 0) {
|
if (dropReq.dnodeId <= 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_DNODE_ID;
|
terrno = TSDB_CODE_MND_INVALID_DNODE_ID;
|
||||||
goto DROP_DNODE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
|
pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
|
||||||
if (pDnode == NULL) {
|
if (pDnode == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||||
goto DROP_DNODE_OVER;
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
|
||||||
|
terrno = TSDB_CODE_NODE_OFFLINE;
|
||||||
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
|
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
|
||||||
if (pMObj != NULL) {
|
if (pMObj != NULL) {
|
||||||
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
|
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
|
||||||
goto DROP_DNODE_OVER;
|
terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
if (pMnode->selfDnodeId == dropReq.dnodeId) {
|
||||||
|
terrno = TSDB_CODE_MND_CANT_DROP_MASTER;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
goto DROP_DNODE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto DROP_DNODE_OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndDropDnode(pMnode, pReq, pDnode);
|
code = mndDropDnode(pMnode, pReq, pDnode, pMObj);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
DROP_DNODE_OVER:
|
_OVER:
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
|
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
|
||||||
}
|
}
|
||||||
|
@ -638,7 +655,6 @@ DROP_DNODE_OVER:
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
mndReleaseMnode(pMnode, pMObj);
|
mndReleaseMnode(pMnode, pMObj);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -736,7 +752,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode);
|
pShow->pIter = sdbFetch(pSdb, SDB_DNODE, pShow->pIter, (void **)&pDnode);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||||
|
|
||||||
cols = 0;
|
cols = 0;
|
||||||
|
|
||||||
|
|
|
@ -529,7 +529,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
SMonDnodeDesc desc = {0};
|
SMonDnodeDesc desc = {0};
|
||||||
desc.dnode_id = pObj->id;
|
desc.dnode_id = pObj->id;
|
||||||
tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
|
tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
|
||||||
if (mndIsDnodeOnline(pMnode, pObj, ms)) {
|
if (mndIsDnodeOnline(pObj, ms)) {
|
||||||
tstrncpy(desc.status, "ready", sizeof(desc.status));
|
tstrncpy(desc.status, "ready", sizeof(desc.status));
|
||||||
} else {
|
} else {
|
||||||
tstrncpy(desc.status, "offline", sizeof(desc.status));
|
tstrncpy(desc.status, "offline", sizeof(desc.status));
|
||||||
|
|
|
@ -358,9 +358,9 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
|
||||||
|
|
||||||
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
|
||||||
|
@ -408,7 +408,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mndIsDnodeOnline(pMnode, pDnode, taosGetTimestampMs())) {
|
if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) {
|
||||||
terrno = TSDB_CODE_NODE_OFFLINE;
|
terrno = TSDB_CODE_NODE_OFFLINE;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -419,7 +419,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -535,18 +535,25 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
if (pObj == NULL) return 0;
|
||||||
|
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1;
|
||||||
|
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1;
|
||||||
|
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) return -1;
|
||||||
|
if (mndTransAppendNullLog(pTrans) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
|
static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
|
||||||
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
|
|
||||||
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
|
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
|
||||||
if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -596,7 +603,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -642,7 +649,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
if (pObj->id == pMnode->selfDnodeId) {
|
if (pObj->id == pMnode->selfDnodeId) {
|
||||||
roles = syncStr(TAOS_SYNC_STATE_LEADER);
|
roles = syncStr(TAOS_SYNC_STATE_LEADER);
|
||||||
}
|
}
|
||||||
if (pObj->pDnode && mndIsDnodeOnline(pMnode, pObj->pDnode, curMs)) {
|
if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) {
|
||||||
roles = syncStr(pObj->state);
|
roles = syncStr(pObj->state);
|
||||||
}
|
}
|
||||||
char b2[12 + VARSTR_HEADER_SIZE] = {0};
|
char b2[12 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
|
|
@ -301,7 +301,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -411,7 +411,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -522,10 +522,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||||
if (pTrans == NULL) goto _OVER;
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
|
||||||
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
|
||||||
mndTransSetDbName(pTrans, pDb->name);
|
mndTransSetDbName(pTrans, pDb->name);
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
|
||||||
|
|
||||||
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
|
||||||
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
|
||||||
|
|
|
@ -307,7 +307,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -419,7 +419,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckNodeAuth(pUser)) {
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,11 +15,13 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
#include "mndAuth.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
#include "mndUser.h"
|
||||||
|
|
||||||
#define VGROUP_VER_NUMBER 1
|
#define VGROUP_VER_NUMBER 1
|
||||||
#define VGROUP_RESERVE_SIZE 64
|
#define VGROUP_RESERVE_SIZE 64
|
||||||
|
@ -34,6 +36,9 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||||
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
||||||
|
|
||||||
|
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
|
||||||
|
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitVgroup(SMnode *pMnode) {
|
int32_t mndInitVgroup(SMnode *pMnode) {
|
||||||
SSdbTable table = {
|
SSdbTable table = {
|
||||||
.sdbType = SDB_VGROUP,
|
.sdbType = SDB_VGROUP,
|
||||||
|
@ -344,9 +349,14 @@ static bool mndResetDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
||||||
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
|
||||||
SDnodeObj *pDnode = pObj;
|
SDnodeObj *pDnode = pObj;
|
||||||
SArray *pArray = p1;
|
SArray *pArray = p1;
|
||||||
|
int32_t exceptDnodeId = *(int32_t *)p2;
|
||||||
|
|
||||||
|
if (exceptDnodeId == pDnode->id) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t curMs = taosGetTimestampMs();
|
int64_t curMs = taosGetTimestampMs();
|
||||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||||
bool isMnode = mndIsMnode(pMnode, pDnode->id);
|
bool isMnode = mndIsMnode(pMnode, pDnode->id);
|
||||||
pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
|
pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
|
||||||
|
|
||||||
|
@ -363,7 +373,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
|
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
|
||||||
|
|
||||||
|
@ -374,7 +384,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
|
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
|
||||||
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, NULL, NULL);
|
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL);
|
||||||
return pArray;
|
return pArray;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,7 +432,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
|
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
SArray *pArray = mndBuildDnodesArray(pMnode);
|
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
if (pArray == NULL) return -1;
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
pVgroup->vgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
|
@ -451,7 +461,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pArray = mndBuildDnodesArray(pMnode);
|
pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
if (pArray == NULL) goto _OVER;
|
if (pArray == NULL) goto _OVER;
|
||||||
|
|
||||||
mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
|
mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
|
||||||
|
@ -501,86 +511,6 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
|
|
||||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
|
||||||
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
|
|
||||||
for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
|
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
|
||||||
|
|
||||||
bool used = false;
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
|
||||||
if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
|
|
||||||
used = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (used) continue;
|
|
||||||
|
|
||||||
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
|
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVgid->dnodeId = pDnode->id;
|
|
||||||
pVgid->role = TAOS_SYNC_STATE_ERROR;
|
|
||||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica,
|
|
||||||
pVgid->dnodeId);
|
|
||||||
|
|
||||||
pVgroup->replica++;
|
|
||||||
pDnode->numOfVnodes++;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
|
||||||
mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) {
|
|
||||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
|
||||||
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = -1;
|
|
||||||
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
|
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
|
||||||
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
|
||||||
if (pVgid->dnodeId == pDnode->id) {
|
|
||||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
|
|
||||||
pDnode->numOfVnodes--;
|
|
||||||
pVgroup->replica--;
|
|
||||||
*pDelVgid = *pVgid;
|
|
||||||
*pVgid = pVgroup->vnodeGid[pVgroup->replica];
|
|
||||||
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
|
||||||
code = 0;
|
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_OVER:
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TSDB_CODE_APP_ERROR;
|
|
||||||
mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
|
||||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
|
SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
|
|
||||||
|
@ -678,7 +608,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
||||||
bool online = false;
|
bool online = false;
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
online = mndIsDnodeOnline(pDnode, curMs);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -797,3 +727,442 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
|
||||||
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
|
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
|
||||||
|
for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
||||||
|
|
||||||
|
bool used = false;
|
||||||
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
|
if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
|
||||||
|
used = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (used) continue;
|
||||||
|
|
||||||
|
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
|
||||||
|
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVgid->dnodeId = pDnode->id;
|
||||||
|
pVgid->role = TAOS_SYNC_STATE_ERROR;
|
||||||
|
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica, pVgid->dnodeId);
|
||||||
|
|
||||||
|
pVgroup->replica++;
|
||||||
|
pDnode->numOfVnodes++;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||||
|
mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) {
|
||||||
|
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
|
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = -1;
|
||||||
|
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
||||||
|
|
||||||
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
||||||
|
if (pVgid->dnodeId == pDnode->id) {
|
||||||
|
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
|
||||||
|
pDnode->numOfVnodes--;
|
||||||
|
pVgroup->replica--;
|
||||||
|
*pDelVgid = *pVgid;
|
||||||
|
*pVgid = pVgroup->vnodeGid[pVgroup->replica];
|
||||||
|
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
||||||
|
code = 0;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TSDB_CODE_APP_ERROR;
|
||||||
|
mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
||||||
|
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
||||||
|
bool standby) {
|
||||||
|
STransAction action = {0};
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
if (pDnode == NULL) return -1;
|
||||||
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby);
|
||||||
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = TDMT_DND_CREATE_VNODE;
|
||||||
|
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
||||||
|
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
|
int32_t contLen = sizeof(SMsgHead);
|
||||||
|
SMsgHead *pHead = taosMemoryMalloc(contLen);
|
||||||
|
if (pHead == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pHead->contLen = htonl(contLen);
|
||||||
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
|
|
||||||
|
action.pCont = pHead;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = TDMT_VND_ALTER_CONFIRM;
|
||||||
|
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pHead);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen);
|
||||||
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = msgType;
|
||||||
|
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
||||||
|
bool isRedo) {
|
||||||
|
STransAction action = {0};
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
if (pDnode == NULL) return -1;
|
||||||
|
action.epSet = mndGetDnodeEpset(pDnode);
|
||||||
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
|
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
||||||
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = TDMT_DND_DROP_VNODE;
|
||||||
|
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||||
|
|
||||||
|
if (isRedo) {
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
|
||||||
|
SArray *pArray) {
|
||||||
|
SVgObj newVg = {0};
|
||||||
|
memcpy(&newVg, pVgroup, sizeof(SVgObj));
|
||||||
|
|
||||||
|
mInfo("vgId:%d, vgroup info before move, replica:%d", newVg.vgId, newVg.replica);
|
||||||
|
for (int32_t i = 0; i < newVg.replica; ++i) {
|
||||||
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
|
||||||
|
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[1], true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||||
|
|
||||||
|
mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId);
|
||||||
|
newVg.replica--;
|
||||||
|
SVnodeGid del = newVg.vnodeGid[vnIndex];
|
||||||
|
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
|
||||||
|
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||||
|
|
||||||
|
mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
|
||||||
|
for (int32_t i = 0; i < newVg.replica; ++i) {
|
||||||
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId) {
|
||||||
|
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId);
|
||||||
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
int32_t vnIndex = -1;
|
||||||
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
|
if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
|
||||||
|
vnIndex = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vnIndex != -1) {
|
||||||
|
mInfo("vgId:%d, vnode:%d will be removed from dnode:%d", pVgroup->vgId, vnIndex, delDnodeId);
|
||||||
|
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||||
|
mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||||
|
int32_t newDnodeId) {
|
||||||
|
mDebug("vgId:%d, will add 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
|
||||||
|
|
||||||
|
SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
|
||||||
|
pVgroup->replica++;
|
||||||
|
pGid->dnodeId = newDnodeId;
|
||||||
|
pGid->role = TAOS_SYNC_STATE_ERROR;
|
||||||
|
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||||
|
int32_t delDnodeId) {
|
||||||
|
mDebug("vgId:%d, will remove 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
|
||||||
|
|
||||||
|
SVnodeGid *pGid = NULL;
|
||||||
|
SVnodeGid delGid = {0};
|
||||||
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
|
if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
|
||||||
|
pGid = &pVgroup->vnodeGid[i];
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pGid == NULL) return 0;
|
||||||
|
|
||||||
|
memcpy(&delGid, pGid, sizeof(SVnodeGid));
|
||||||
|
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
|
||||||
|
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
|
||||||
|
pVgroup->replica--;
|
||||||
|
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
|
||||||
|
SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
|
||||||
|
SDnodeObj *pOld3) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SSdbRaw *pRaw = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
|
||||||
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||||
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
mDebug("trans:%d, used to drop redistribute vgId:%d", pTrans->id, pVgroup->vgId);
|
||||||
|
|
||||||
|
SVgObj newVg = {0};
|
||||||
|
memcpy(&newVg, pVgroup, sizeof(SVgObj));
|
||||||
|
mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
|
||||||
|
for (int32_t i = 0; i < newVg.replica; ++i) {
|
||||||
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER;
|
||||||
|
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER;
|
||||||
|
if (pNew2 != NULL) {
|
||||||
|
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id) != 0) goto _OVER;
|
||||||
|
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id) != 0) goto _OVER;
|
||||||
|
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id) != 0) goto _OVER;
|
||||||
|
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id) != 0) goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRaw = mndVgroupActionEncode(&newVg);
|
||||||
|
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||||
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
pRaw = NULL;
|
||||||
|
|
||||||
|
mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
|
||||||
|
for (int32_t i = 0; i < newVg.replica; ++i) {
|
||||||
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
sdbFreeRaw(pRaw);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
SDnodeObj *pNew1 = NULL;
|
||||||
|
SDnodeObj *pNew2 = NULL;
|
||||||
|
SDnodeObj *pNew3 = NULL;
|
||||||
|
SDnodeObj *pOld1 = NULL;
|
||||||
|
SDnodeObj *pOld2 = NULL;
|
||||||
|
SDnodeObj *pOld3 = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
int64_t curMs = taosGetTimestampMs();
|
||||||
|
SMDropMnodeReq redReq = {0};
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
mDebug("vgId:%d, start to redistribute", 2);
|
||||||
|
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVgroup = mndAcquireVgroup(pMnode, 2);
|
||||||
|
if (pVgroup == NULL) goto _OVER;
|
||||||
|
|
||||||
|
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||||
|
if (pDb == NULL) goto _OVER;
|
||||||
|
|
||||||
|
if (pVgroup->replica == 1) {
|
||||||
|
pNew1 = mndAcquireDnode(pMnode, 1);
|
||||||
|
pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
|
||||||
|
if (pNew1 == NULL || pOld1 == NULL) goto _OVER;
|
||||||
|
if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs)) {
|
||||||
|
terrno = TSDB_CODE_NODE_OFFLINE;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
if (pNew1 == pOld1) {
|
||||||
|
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
if (mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL) != 0) goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVgroup->replica == 3) {
|
||||||
|
pNew1 = mndAcquireDnode(pMnode, 1);
|
||||||
|
pNew2 = mndAcquireDnode(pMnode, 2);
|
||||||
|
pNew3 = mndAcquireDnode(pMnode, 3);
|
||||||
|
pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
|
||||||
|
pOld2 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[1].dnodeId);
|
||||||
|
pOld3 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[2].dnodeId);
|
||||||
|
if (pNew1 == NULL || pOld1 == NULL || pNew2 == NULL || pOld2 == NULL || pNew3 == NULL || pOld3 == NULL) goto _OVER;
|
||||||
|
if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs) || !mndIsDnodeOnline(pNew2, curMs) ||
|
||||||
|
!mndIsDnodeOnline(pOld2, curMs) || !mndIsDnodeOnline(pNew3, curMs) || !mndIsDnodeOnline(pOld3, curMs)) {
|
||||||
|
terrno = TSDB_CODE_NODE_OFFLINE;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
bool changed = true;
|
||||||
|
if (pNew1 != pOld1 || pNew1 != pOld2 || pNew1 != pOld3) changed = true;
|
||||||
|
if (pNew2 != pOld1 || pNew2 != pOld2 || pNew2 != pOld3) changed = true;
|
||||||
|
if (pNew3 != pOld1 || pNew3 != pOld2 || pNew3 != pOld3) changed = true;
|
||||||
|
if (!changed) {
|
||||||
|
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
if (mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3) != 0) goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
mDebug("vgId:%d, failed to redistribute since %s", 1, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseDnode(pMnode, pNew1);
|
||||||
|
mndReleaseDnode(pMnode, pNew2);
|
||||||
|
mndReleaseDnode(pMnode, pNew3);
|
||||||
|
mndReleaseDnode(pMnode, pOld1);
|
||||||
|
mndReleaseDnode(pMnode, pOld2);
|
||||||
|
mndReleaseDnode(pMnode, pOld3);
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; }
|
|
@ -42,8 +42,10 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw) {
|
void sdbFreeRaw(SSdbRaw *pRaw) {
|
||||||
|
if (pRaw != NULL) {
|
||||||
mTrace("raw:%p, is freed", pRaw);
|
mTrace("raw:%p, is freed", pRaw);
|
||||||
taosMemoryFree(pRaw);
|
taosMemoryFree(pRaw);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) {
|
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) {
|
||||||
|
|
|
@ -183,15 +183,15 @@ if $rows != 15 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print =============== drop dnode
|
print =============== drop dnode
|
||||||
sql drop dnode 2;
|
#sql drop dnode 2;
|
||||||
sql show dnodes;
|
#sql show dnodes;
|
||||||
if $rows != 1 then
|
#if $rows != 1 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
|
|
||||||
if $data00 != 1 then
|
#if $data00 != 1 then
|
||||||
return -1
|
# return -1
|
||||||
endi
|
#endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue