fix: alter db replications
This commit is contained in:
parent
ae23dd2382
commit
0886d23e02
|
@ -31,8 +31,9 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup);
|
|||
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
|
||||
|
||||
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
|
||||
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *new1, SVnodeGid *new2, SVnodeGid *exist);
|
||||
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *del1, SVnodeGid *del2, SVnodeGid *exist);
|
||||
SArray *mndBuildDnodesArray(SMnode *pMnode);
|
||||
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
|
||||
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2);
|
||||
|
||||
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||
|
|
|
@ -399,6 +399,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
|
|||
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
|
||||
if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1;
|
||||
if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1;
|
||||
if (pCfg->replications != 1 && pCfg->replications != 3) return -1;
|
||||
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
|
||||
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
|
||||
if (pCfg->hashMethod != 1) return -1;
|
||||
|
@ -720,7 +721,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
||||
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
|
||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||
|
@ -732,27 +733,30 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
SVgObj newVgroup = {0};
|
||||
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
||||
if (newVgroup.replica < pDb->cfg.replications) {
|
||||
SVnodeGid new1 = {0};
|
||||
SVnodeGid new2 = {0};
|
||||
SVnodeGid exist = {0};
|
||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, &new1, &new2, &exist) != 0) {
|
||||
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
|
||||
pVgroup->vnodeGid[0].dnodeId);
|
||||
|
||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) {
|
||||
mError("db:%s, failed to add vnode to vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &new1, true) != 0) return -1;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &new2, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &exist, true) != 0) return -1;
|
||||
newVgroup.replica = pDb->cfg.replications;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[0], true) != 0) return -1;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
||||
} else {
|
||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
||||
|
||||
SVnodeGid del1 = {0};
|
||||
SVnodeGid del2 = {0};
|
||||
SVnodeGid exist = {0};
|
||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, &del1, &del2, &exist) != 0) {
|
||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1, &del2) != 0) {
|
||||
mError("db:%s, failed to remove vnode from vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
newVgroup.replica = pDb->cfg.replications;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[0], true) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
|
||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
|
||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &exist, true) != 0) return -1;
|
||||
}
|
||||
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||
|
@ -765,8 +769,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
}
|
||||
|
||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode);
|
||||
|
||||
while (1) {
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
@ -774,9 +779,10 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pVgroup->dbUid == pNew->uid) {
|
||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) {
|
||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
taosArrayDestroy(pArray);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -784,6 +790,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pArray);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -370,7 +370,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
|||
return true;
|
||||
}
|
||||
|
||||
static SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
||||
SArray *mndBuildDnodesArray(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
|
||||
|
||||
|
@ -421,7 +421,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pAr
|
|||
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
|
||||
}
|
||||
|
||||
mDebug("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
|
||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is alloced", pVgroup->dbName, pVgroup->vgId, v, pVgid->dnodeId);
|
||||
pDnode->numOfVnodes++;
|
||||
}
|
||||
|
||||
|
@ -440,12 +440,10 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
|||
}
|
||||
|
||||
pArray = mndBuildDnodesArray(pMnode);
|
||||
if (pArray == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
if (pArray == NULL) goto _OVER;
|
||||
|
||||
mDebug("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
|
||||
pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
|
||||
mInfo("db:%s, total %d dnodes used to create %d vgroups (%d vnodes)", pDb->name, (int32_t)taosArrayGetSize(pArray),
|
||||
pDb->cfg.numOfVgroups, pDb->cfg.numOfVgroups * pDb->cfg.replications);
|
||||
|
||||
int32_t allocedVgroups = 0;
|
||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||
|
@ -483,7 +481,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
|
|||
*ppVgroups = pVgroups;
|
||||
code = 0;
|
||||
|
||||
mDebug("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
|
||||
mInfo("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
|
||||
|
||||
_OVER:
|
||||
if (code != 0) taosMemoryFree(pVgroups);
|
||||
|
@ -491,11 +489,85 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *new1, SVnodeGid *new2, SVnodeGid *exist) {
|
||||
return 0;
|
||||
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
|
||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
||||
}
|
||||
|
||||
int32_t maxPos = 1;
|
||||
for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
||||
|
||||
bool used = false;
|
||||
for (int32_t vn = 0; vn < maxPos; ++vn) {
|
||||
if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
|
||||
used = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (used) continue;
|
||||
|
||||
if (pDnode == NULL || pDnode->numOfVnodes > pDnode->numOfSupportVnodes) {
|
||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos];
|
||||
pVgid->dnodeId = pDnode->id;
|
||||
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
|
||||
pDnode->numOfVnodes++;
|
||||
|
||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
|
||||
maxPos++;
|
||||
if (maxPos == 3) return 0;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SVnodeGid *del1, SVnodeGid *del2, SVnodeGid *exist) {
|
||||
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) {
|
||||
int32_t removedNum = 0;
|
||||
|
||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
|
||||
}
|
||||
|
||||
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
|
||||
SDnodeObj *pDnode = taosArrayGet(pArray, d);
|
||||
|
||||
for (int32_t vn = 0; vn < TSDB_MAX_REPLICA; ++vn) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
||||
if (pVgid->dnodeId == pDnode->id) {
|
||||
if (removedNum == 0) *del1 = *pVgid;
|
||||
if (removedNum == 1) *del2 = *pVgid;
|
||||
|
||||
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
|
||||
memset(pVgid, 0, sizeof(SVnodeGid));
|
||||
removedNum++;
|
||||
pDnode->numOfVnodes--;
|
||||
|
||||
if (removedNum == 2) goto _OVER;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (removedNum != 2) return -1;
|
||||
|
||||
for (int32_t vn = 1; vn < TSDB_MAX_REPLICA; ++vn) {
|
||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
|
||||
if (pVgid->dnodeId != 0) {
|
||||
memcpy(&pVgroup->vnodeGid[0], pVgid, sizeof(SVnodeGid));
|
||||
memset(pVgid, 0, sizeof(SVnodeGid));
|
||||
}
|
||||
}
|
||||
|
||||
mInfo("db:%s, vgId:%d, dnode:%d is keeped", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -266,4 +266,96 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
|
|||
taosMsleep(1300);
|
||||
test.SendShowReq(TSDB_MGMT_TABLE_DNODE, "dnodes", "");
|
||||
EXPECT_EQ(test.GetShowRows(), 4);
|
||||
|
||||
// alter replica
|
||||
#if 0
|
||||
{
|
||||
SCreateDbReq createReq = {0};
|
||||
strcpy(createReq.db, "1.d2");
|
||||
createReq.numOfVgroups = 2;
|
||||
createReq.buffer = -1;
|
||||
createReq.pageSize = -1;
|
||||
createReq.pages = -1;
|
||||
createReq.daysPerFile = 1000;
|
||||
createReq.daysToKeep0 = 3650;
|
||||
createReq.daysToKeep1 = 3650;
|
||||
createReq.daysToKeep2 = 3650;
|
||||
createReq.minRows = 100;
|
||||
createReq.maxRows = 4096;
|
||||
createReq.fsyncPeriod = 3000;
|
||||
createReq.walLevel = 1;
|
||||
createReq.precision = 0;
|
||||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.strict = 1;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
createReq.numOfStables = 0;
|
||||
createReq.numOfRetensions = 0;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSCreateDbReq(pReq, contLen, &createReq);
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
|
||||
test.SendShowReq(TSDB_MGMT_TABLE_DB, "user_databases", "");
|
||||
EXPECT_EQ(test.GetShowRows(), 3);
|
||||
}
|
||||
|
||||
{
|
||||
SAlterDbReq alterdbReq = {0};
|
||||
strcpy(alterdbReq.db, "1.d2");
|
||||
|
||||
alterdbReq.buffer = 12;
|
||||
alterdbReq.pageSize = -1;
|
||||
alterdbReq.pages = -1;
|
||||
alterdbReq.daysPerFile = -1;
|
||||
alterdbReq.daysToKeep0 = -1;
|
||||
alterdbReq.daysToKeep1 = -1;
|
||||
alterdbReq.daysToKeep2 = -1;
|
||||
alterdbReq.fsyncPeriod = 4000;
|
||||
alterdbReq.walLevel = 2;
|
||||
alterdbReq.strict = 1;
|
||||
alterdbReq.cacheLastRow = 1;
|
||||
alterdbReq.replications = 3;
|
||||
|
||||
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSAlterDbReq(pReq, contLen, &alterdbReq);
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
SAlterDbReq alterdbReq = {0};
|
||||
strcpy(alterdbReq.db, "1.d2");
|
||||
|
||||
alterdbReq.buffer = 12;
|
||||
alterdbReq.pageSize = -1;
|
||||
alterdbReq.pages = -1;
|
||||
alterdbReq.daysPerFile = -1;
|
||||
alterdbReq.daysToKeep0 = -1;
|
||||
alterdbReq.daysToKeep1 = -1;
|
||||
alterdbReq.daysToKeep2 = -1;
|
||||
alterdbReq.fsyncPeriod = 4000;
|
||||
alterdbReq.walLevel = 2;
|
||||
alterdbReq.strict = 1;
|
||||
alterdbReq.cacheLastRow = 1;
|
||||
alterdbReq.replications = 1;
|
||||
|
||||
int32_t contLen = tSerializeSAlterDbReq(NULL, 0, &alterdbReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSAlterDbReq(pReq, contLen, &alterdbReq);
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_DB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
#endif
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue