commit
c77095bbb1
|
@ -219,6 +219,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_VGROUP_NOT_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0391)
|
||||
#define TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE TAOS_DEF_ERROR_CODE(0, 0x0392)
|
||||
#define TSDB_CODE_MND_VGROUP_UN_CHANGED TAOS_DEF_ERROR_CODE(0, 0x0393)
|
||||
#define TSDB_CODE_MND_HAS_OFFLINE_DNODE TAOS_DEF_ERROR_CODE(0, 0x0394)
|
||||
|
||||
// mnode-stable
|
||||
#define TSDB_CODE_MND_STB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
|
||||
|
|
|
@ -38,6 +38,7 @@ static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter);
|
|||
|
||||
static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq);
|
||||
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq);
|
||||
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq);
|
||||
|
||||
int32_t mndInitVgroup(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -1165,4 +1166,159 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; }
|
||||
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; }
|
||||
|
||||
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||
SDnodeObj *pSrc, SDnodeObj *pDst) {
|
||||
SVgObj newVg = {0};
|
||||
memcpy(&newVg, pVgroup, sizeof(SVgObj));
|
||||
mInfo("vgId:%d, vgroup info before balance, replica:%d", newVg.vgId, newVg.replica);
|
||||
for (int32_t i = 0; i < newVg.replica; ++i) {
|
||||
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
|
||||
}
|
||||
|
||||
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pDst->id) != 0) return -1;
|
||||
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pSrc->id) != 0) return -1;
|
||||
|
||||
SSdbRaw *pRaw = mndVgroupActionEncode(&newVg);
|
||||
if (pRaw == NULL || mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||
sdbFreeRaw(pRaw);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mInfo("vgId:%d, vgroup info after balance, replica:%d", newVg.vgId, newVg.replica);
|
||||
for (int32_t i = 0; i < newVg.replica; ++i) {
|
||||
mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBalanceVgroupBetweenDnode(SMnode *pMnode, STrans *pTrans, SDnodeObj *pSrc, SDnodeObj *pDst) {
|
||||
void *pIter = NULL;
|
||||
int32_t code = -1;
|
||||
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
bool existInSrc = false;
|
||||
bool existInDst = false;
|
||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||
SVnodeGid *pGid = &pVgroup->vnodeGid[i];
|
||||
if (pGid->dnodeId == pSrc->id) existInSrc = true;
|
||||
if (pGid->dnodeId == pDst->id) existInDst = true;
|
||||
}
|
||||
|
||||
if (!existInSrc || existInDst) {
|
||||
sdbRelease(pMnode->pSdb, pVgroup);
|
||||
}
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||
code = mndSetBalanceVgroupInfoToTrans(pMnode, pTrans, pDb, pVgroup, pSrc, pDst);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
sdbRelease(pMnode->pSdb, pVgroup);
|
||||
break;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
|
||||
int32_t code = -1;
|
||||
int32_t numOfVgroups = 0;
|
||||
STrans *pTrans = NULL;
|
||||
|
||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
mndTransSetSerial(pTrans);
|
||||
mDebug("trans:%d, used to balance vgroup", pTrans->id);
|
||||
|
||||
while (1) {
|
||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||
SDnodeObj *pSrc = taosArrayGet(pArray, 0);
|
||||
SDnodeObj *pDst = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
|
||||
|
||||
float srcScore = (float)(pSrc->numOfVnodes - 1) / pSrc->numOfSupportVnodes;
|
||||
float dstScore = (float)(pDst->numOfVnodes + 1) / pDst->numOfSupportVnodes;
|
||||
if (srcScore + 0.0001 < dstScore) {
|
||||
mDebug("trans:%d, balance vgroup from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
|
||||
code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst);
|
||||
if (code == 0) {
|
||||
numOfVgroups++;
|
||||
continue;
|
||||
} else {
|
||||
mError("trans:%d, failed to balance vgroup from dnode:%d to dnode:%d", pTrans->id, pSrc->id, pDst->id);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
mDebug("trans:%d, no vgroup need to balance vgroup any more", pTrans->id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (numOfVgroups <= 0) {
|
||||
mDebug("no need to balance vgroup");
|
||||
code = 0;
|
||||
} else {
|
||||
mDebug("start to balance vgroup, numOfVgroups:%d", numOfVgroups);
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
int32_t code = -1;
|
||||
SUserObj *pUser = NULL;
|
||||
SArray *pArray = NULL;
|
||||
void *pIter = NULL;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
|
||||
mDebug("start to balance vgroup");
|
||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||
if (pUser == NULL) {
|
||||
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndCheckNodeAuth(pUser) != 0) goto _OVER;
|
||||
|
||||
while (1) {
|
||||
SDnodeObj *pDnode = NULL;
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_DNODE, pIter, (void **)&pDnode);
|
||||
if (pIter == NULL) break;
|
||||
if (!mndIsDnodeOnline(pDnode, curMs)) {
|
||||
terrno = TSDB_CODE_MND_HAS_OFFLINE_DNODE;
|
||||
mError("failed to balance vgroup since %s, dnode:%d", terrstr(), pDnode->id);
|
||||
sdbRelease(pMnode->pSdb, pDnode);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
sdbRelease(pMnode->pSdb, pDnode);
|
||||
}
|
||||
|
||||
pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
if (pArray == NULL) goto _OVER;
|
||||
|
||||
if (taosArrayGetSize(pArray) < 2) {
|
||||
mDebug("no need to balance vgroup since dnode num less than 2");
|
||||
code = 0;
|
||||
} else {
|
||||
code = mndBalanceVgroup(pMnode, pReq, pArray);
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("failed to balance vgroup since %s", terrstr());
|
||||
}
|
||||
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
taosArrayDestroy(pArray);
|
||||
return code;
|
||||
}
|
|
@ -222,7 +222,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist")
|
|||
// mnode-vgroup
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "Vgroup does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_UN_CHANGED, "Vgroup distribution has not changed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_HAS_OFFLINE_DNODE, "Offline dnode exists")
|
||||
|
||||
// mnode-stable
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
|
||||
|
|
Loading…
Reference in New Issue