feat: redistribute vgroup

This commit is contained in:
Shengliang Guan 2022-06-06 16:21:56 +08:00
parent 5838956831
commit 362f37b5ed
8 changed files with 237 additions and 41 deletions

View File

@ -218,6 +218,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390)
#define TSDB_CODE_MND_VGROUP_NOT_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0391)
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392)
#define TSDB_CODE_MND_VGROUP_UN_CHANGED TAOS_DEF_ERROR_CODE(0, 0x0393)
// mnode-stable
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)

View File

@ -299,7 +299,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}
@ -409,7 +409,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}

View File

@ -542,7 +542,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
goto CREATE_DNODE_OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto CREATE_DNODE_OVER;
}
@ -566,6 +566,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to drop dnode:%d", pTrans->id, pDnode->id);
pRaw = mndDnodeActionEncode(pDnode);
@ -582,7 +583,6 @@ static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SM
if (mndSetMoveVgroupsInfoToTrans(pMnode, pTrans, pDnode->id) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);
code = 0;
_OVER:
@ -640,7 +640,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}

View File

@ -358,9 +358,9 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
@ -419,7 +419,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}
@ -549,12 +549,12 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);
code = 0;
_OVER:
@ -602,7 +602,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}

View File

@ -301,7 +301,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}
@ -411,7 +411,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}

View File

@ -510,10 +510,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;

View File

@ -307,7 +307,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}
@ -419,7 +419,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}

View File

@ -15,11 +15,13 @@
#define _DEFAULT_SOURCE
#include "mndVgroup.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#define VGROUP_VER_NUMBER 1
#define VGROUP_RESERVE_SIZE 64
@ -34,6 +36,9 @@ static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
int32_t mndInitVgroup(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_VGROUP,
@ -909,38 +914,38 @@ int32_t mndAddDropVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgOb
int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vnIndex,
SArray *pArray) {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
SVgObj newVg = {0};
memcpy(&newVg, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before move, replica:%d", newVgroup.vgId, newVgroup.replica);
for (int32_t i = 0; i < newVgroup.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVgroup.vgId, i, newVgroup.vnodeGid[i].dnodeId);
mInfo("vgId:%d, vgroup info before move, replica:%d", newVg.vgId, newVg.replica);
for (int32_t i = 0; i < newVg.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
}
mInfo("vgId:%d, will add 1 vnodes", pVgroup->vgId);
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, &newVg, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
mInfo("vgId:%d, will remove 1 vnodes", pVgroup->vgId);
newVgroup.replica--;
SVnodeGid del = newVgroup.vnodeGid[vnIndex];
newVgroup.vnodeGid[vnIndex] = newVgroup.vnodeGid[newVgroup.replica];
memset(&newVgroup.vnodeGid[newVgroup.replica], 0, sizeof(SVnodeGid));
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
newVg.replica--;
SVnodeGid del = newVg.vnodeGid[vnIndex];
newVg.vnodeGid[vnIndex] = newVg.vnodeGid[newVg.replica];
memset(&newVg.vnodeGid[newVg.replica], 0, sizeof(SVnodeGid));
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg, &del, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
mInfo("vgId:%d, vgroup info after move, replica:%d", newVgroup.vgId, newVgroup.replica);
for (int32_t i = 0; i < newVgroup.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVgroup.vgId, i, newVgroup.vnodeGid[i].dnodeId);
mInfo("vgId:%d, vgroup info after move, replica:%d", newVg.vgId, newVg.replica);
for (int32_t i = 0; i < newVg.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
}
return 0;
}
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t dropDnodeId) {
SArray *pArray = mndBuildDnodesArray(pMnode, dropDnodeId);
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId) {
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId);
if (pArray == NULL) return -1;
void *pIter = NULL;
@ -951,14 +956,14 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t dro
int32_t vnIndex = -1;
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (pVgroup->vnodeGid[i].dnodeId == dropDnodeId) {
if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
vnIndex = i;
break;
}
}
if (vnIndex != -1) {
mInfo("vgId:%d, vnode:%d will be removed from dnode:%d", pVgroup->vgId, vnIndex, dropDnodeId);
mInfo("vgId:%d, vnode:%d will be removed from dnode:%d", pVgroup->vgId, vnIndex, delDnodeId);
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
mndSetMoveVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, vnIndex, pArray);
mndReleaseDb(pMnode, pDb);
@ -969,4 +974,195 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t dro
taosArrayDestroy(pArray);
return 0;
}
}
static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t newDnodeId) {
mDebug("vgId:%d, will add 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId);
SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica];
pVgroup->replica++;
pGid->dnodeId = newDnodeId;
pGid->role = TAOS_SYNC_STATE_ERROR;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pGid, true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
return 0;
}
static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
int32_t delDnodeId) {
mDebug("vgId:%d, will remove 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId);
SVnodeGid *pGid = NULL;
SVnodeGid delGid = {0};
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (pVgroup->vnodeGid[i].dnodeId == delDnodeId) {
pGid = &pVgroup->vnodeGid[i];
break;
}
}
if (pGid == NULL) return 0;
memcpy(&delGid, pGid, sizeof(SVnodeGid));
memcpy(pGid, &pVgroup->vnodeGid[pVgroup->replica], sizeof(SVnodeGid));
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
pVgroup->replica--;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, pVgroup, &delGid, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) return -1;
return 0;
}
static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pNew1,
SDnodeObj *pOld1, SDnodeObj *pNew2, SDnodeObj *pOld2, SDnodeObj *pNew3,
SDnodeObj *pOld3) {
int32_t code = -1;
SSdbRaw *pRaw = NULL;
STrans *pTrans = NULL;
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to drop redistribute vgId:%d", pTrans->id, pVgroup->vgId);
SVgObj newVg = {0};
memcpy(&newVg, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica);
for (int32_t i = 0; i < newVg.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
}
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER;
if (pNew2 != NULL) {
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew2->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld2->id) != 0) goto _OVER;
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew3->id) != 0) goto _OVER;
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld3->id) != 0) goto _OVER;
}
pRaw = mndVgroupActionEncode(&newVg);
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
pRaw = NULL;
mInfo("vgId:%d, vgroup info after redistribute, replica:%d", newVg.vgId, newVg.replica);
for (int32_t i = 0; i < newVg.replica; ++i) {
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
}
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
mndReleaseDb(pMnode, pDb);
return code;
}
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SUserObj *pUser = NULL;
SDnodeObj *pNew1 = NULL;
SDnodeObj *pNew2 = NULL;
SDnodeObj *pNew3 = NULL;
SDnodeObj *pOld1 = NULL;
SDnodeObj *pOld2 = NULL;
SDnodeObj *pOld3 = NULL;
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
int32_t code = -1;
int64_t curMs = taosGetTimestampMs();
SMDropMnodeReq redReq = {0};
#if 0
if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
#endif
mDebug("vgId:%d, start to redistribute", 2);
pUser = mndAcquireUser(pMnode, pReq->conn.user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto _OVER;
}
if (mndCheckNodeAuth(pUser) != 0) {
goto _OVER;
}
pVgroup = mndAcquireVgroup(pMnode, 2);
if (pVgroup == NULL) goto _OVER;
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb == NULL) goto _OVER;
if (pVgroup->replica == 1) {
pNew1 = mndAcquireDnode(pMnode, 1);
pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
if (pNew1 == NULL || pOld1 == NULL) goto _OVER;
if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs)) {
terrno = TSDB_CODE_NODE_OFFLINE;
goto _OVER;
}
if (pNew1 == pOld1) {
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
goto _OVER;
}
if (mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, NULL, NULL, NULL, NULL) != 0) goto _OVER;
}
if (pVgroup->replica == 3) {
pNew1 = mndAcquireDnode(pMnode, 1);
pNew2 = mndAcquireDnode(pMnode, 2);
pNew3 = mndAcquireDnode(pMnode, 3);
pOld1 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[0].dnodeId);
pOld2 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[1].dnodeId);
pOld3 = mndAcquireDnode(pMnode, pVgroup->vnodeGid[2].dnodeId);
if (pNew1 == NULL || pOld1 == NULL || pNew2 == NULL || pOld2 == NULL || pNew3 == NULL || pOld3 == NULL) goto _OVER;
if (!mndIsDnodeOnline(pNew1, curMs) || !mndIsDnodeOnline(pOld1, curMs) || !mndIsDnodeOnline(pNew2, curMs) ||
!mndIsDnodeOnline(pOld2, curMs) || !mndIsDnodeOnline(pNew3, curMs) || !mndIsDnodeOnline(pOld3, curMs)) {
terrno = TSDB_CODE_NODE_OFFLINE;
goto _OVER;
}
bool changed = true;
if (pNew1 != pOld1 || pNew1 != pOld2 || pNew1 != pOld3) changed = true;
if (pNew2 != pOld1 || pNew2 != pOld2 || pNew2 != pOld3) changed = true;
if (pNew3 != pOld1 || pNew3 != pOld2 || pNew3 != pOld3) changed = true;
if (!changed) {
terrno = TSDB_CODE_MND_VGROUP_UN_CHANGED;
goto _OVER;
}
if (mndRedistributeVgroup(pMnode, pReq, pDb, pVgroup, pNew1, pOld1, pNew2, pOld2, pNew3, pOld3) != 0) goto _OVER;
}
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("vgId:%d, failed to redistribute since %s", 1, terrstr());
}
mndReleaseDnode(pMnode, pNew1);
mndReleaseDnode(pMnode, pNew2);
mndReleaseDnode(pMnode, pNew3);
mndReleaseDnode(pMnode, pOld1);
mndReleaseDnode(pMnode, pOld2);
mndReleaseDnode(pMnode, pOld3);
mndReleaseUser(pMnode, pUser);
mndReleaseVgroup(pMnode, pVgroup);
mndReleaseDb(pMnode, pDb);
return code;
}
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; }