Merge pull request #17293 from taosdata/fix/TD-19441
fix: check memory while alter db buffer
This commit is contained in:
commit
e058d28f08
|
@ -212,38 +212,47 @@ static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SCreateVnodeReq createReq = {0};
|
SCreateVnodeReq req = {0};
|
||||||
SVnodeCfg vnodeCfg = {0};
|
SVnodeCfg vnodeCfg = {0};
|
||||||
SWrapperCfg wrapperCfg = {0};
|
SWrapperCfg wrapperCfg = {0};
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
char path[TSDB_FILENAME_LEN] = {0};
|
char path[TSDB_FILENAME_LEN] = {0};
|
||||||
|
|
||||||
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) {
|
if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo(
|
dInfo(
|
||||||
"vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d "
|
"vgId:%d, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
|
||||||
"tsdbPageSize:%d",
|
" cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
|
||||||
createReq.vgId, createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize,
|
"days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d, wal "
|
||||||
createReq.sstTrigger, createReq.tsdbPageSize);
|
"fsync:%d level:%d retentionPeriod:%d retentionSize:%d rollPeriod:%d segSize:%d, hash method:%d begin:%u end:%u "
|
||||||
dInfo("vgId:%d, hashMethod:%d begin:%u end:%u prefix:%d surfix:%d", createReq.vgId, createReq.hashMethod,
|
"prefix:%d surfix:%d replica:%d selfIndex:%d strict:%d",
|
||||||
createReq.hashBegin, createReq.hashEnd, createReq.hashPrefix, createReq.hashSuffix);
|
req.vgId, req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
|
||||||
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid,
|
||||||
|
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression,
|
||||||
|
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize,
|
||||||
|
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix,
|
||||||
|
req.replica, req.selfIndex, req.strict);
|
||||||
|
for (int32_t i = 0; i < req.replica; ++i) {
|
||||||
|
dInfo("vgId:%d, replica:%d fqdn:%s port:%u", req.vgId, req.replicas[i].id, req.replicas[i].fqdn,
|
||||||
|
req.replicas[i].port);
|
||||||
|
}
|
||||||
|
vmGenerateVnodeCfg(&req, &vnodeCfg);
|
||||||
|
|
||||||
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
if (vmTsmaAdjustDays(&vnodeCfg, &req) < 0) {
|
||||||
dError("vgId:%d, failed to adjust tsma days since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, terrstr());
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
|
vmGenerateWrapperCfg(pMgmt, &req, &wrapperCfg);
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
dDebug("vgId:%d, already exist", createReq.vgId);
|
dDebug("vgId:%d, already exist", req.vgId);
|
||||||
tFreeSCreateVnodeReq(&createReq);
|
tFreeSCreateVnodeReq(&req);
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
@ -252,36 +261,36 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||||
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
||||||
tFreeSCreateVnodeReq(&createReq);
|
tFreeSCreateVnodeReq(&req);
|
||||||
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
|
code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vmTsmaProcessCreate(pImpl, &createReq);
|
code = vmTsmaProcessCreate(pImpl, &req);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to create tsma since %s", req.vgId, terrstr());
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vnodeStart(pImpl);
|
code = vnodeStart(pImpl);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
|
dError("vgId:%d, failed to start sync since %s", req.vgId, terrstr());
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,10 +305,10 @@ _OVER:
|
||||||
vnodeClose(pImpl);
|
vnodeClose(pImpl);
|
||||||
vnodeDestroy(path, pMgmt->pTfs);
|
vnodeDestroy(path, pMgmt->pTfs);
|
||||||
} else {
|
} else {
|
||||||
dInfo("vgId:%d, vnode is created", createReq.vgId);
|
dInfo("vgId:%d, vnode is created", req.vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
tFreeSCreateVnodeReq(&createReq);
|
tFreeSCreateVnodeReq(&req);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,7 +44,8 @@ int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pV
|
||||||
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
||||||
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
|
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
|
||||||
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
|
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
|
||||||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray);
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||||
|
SArray *pArray);
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
|
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
|
||||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
|
|
|
@ -746,7 +746,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
|
@ -756,8 +756,8 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
|
||||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (mndVgroupInDb(pVgroup, pNew->uid)) {
|
if (mndVgroupInDb(pVgroup, pNewDb->uid)) {
|
||||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pNew, pVgroup, pArray) != 0) {
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pOldDb, pNewDb, pVgroup, pArray) != 0) {
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pArray);
|
||||||
|
|
|
@ -405,7 +405,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
||||||
pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
|
pDnode->memUsed = mndGetVnodesMemory(pMnode, pDnode->id);
|
||||||
|
|
||||||
mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
|
mInfo("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d memory avail:%" PRId64 " used:%" PRId64, pDnode->id,
|
||||||
pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
|
pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online, pDnode->memAvail, pDnode->memUsed);
|
||||||
|
|
||||||
if (isMnode) {
|
if (isMnode) {
|
||||||
pDnode->numOfVnodes++;
|
pDnode->numOfVnodes++;
|
||||||
|
@ -1293,7 +1293,7 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
|
||||||
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
|
mError("db:%s, vgId:%d, no enough memory:%" PRId64 " in dnode:%d avail:%" PRId64 " used:%" PRId64,
|
||||||
pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
|
pVgroup->dbName, pVgroup->vgId, vgMem, pNew3->id, pNew3->memAvail, pNew3->memUsed);
|
||||||
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
|
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else {
|
} else {
|
||||||
pNew3->memUsed += vgMem;
|
pNew3->memUsed += vgMem;
|
||||||
}
|
}
|
||||||
|
@ -1530,44 +1530,81 @@ _OVER:
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
static int32_t mndCheckDnodeMemory(SMnode *pMnode, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pOldVgroup,
|
||||||
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
|
SVgObj *pNewVgroup, SArray *pArray) {
|
||||||
return mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_CONFIG);
|
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
|
||||||
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
|
bool inVgroup = false;
|
||||||
|
for (int32_t j = 0; j < pOldVgroup->replica; ++j) {
|
||||||
|
SVnodeGid *pVgId = &pOldVgroup->vnodeGid[i];
|
||||||
|
if (pDnode->id == pVgId->dnodeId) {
|
||||||
|
pDnode->memUsed -= mndGetVgroupMemory(pMnode, pOldDb, pOldVgroup);
|
||||||
|
inVgroup = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int32_t j = 0; j < pNewVgroup->replica; ++j) {
|
||||||
|
SVnodeGid *pVgId = &pNewVgroup->vnodeGid[i];
|
||||||
|
if (pDnode->id == pVgId->dnodeId) {
|
||||||
|
pDnode->memUsed += mndGetVgroupMemory(pMnode, pNewDb, pNewVgroup);
|
||||||
|
inVgroup = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (pDnode->memAvail - pDnode->memUsed <= 0) {
|
||||||
|
mError("db:%s, vgId:%d, no enough memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
|
||||||
|
pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
|
||||||
|
terrno = TSDB_CODE_MND_NO_ENOUGH_MEM_IN_DNODE;
|
||||||
|
return -1;
|
||||||
|
} else if (inVgroup) {
|
||||||
|
mInfo("db:%s, vgId:%d, memory in dnode:%d, avail:%" PRId64 " used:%" PRId64, pNewVgroup->dbName,
|
||||||
|
pNewVgroup->vgId, pDnode->id, pDnode->memAvail, pDnode->memUsed);
|
||||||
|
} else {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, SVgObj *pVgroup,
|
||||||
|
SArray *pArray) {
|
||||||
SVgObj newVgroup = {0};
|
SVgObj newVgroup = {0};
|
||||||
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
||||||
|
|
||||||
|
if (pVgroup->replica <= 0 || pVgroup->replica == pNewDb->cfg.replications) {
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) return -1;
|
||||||
|
if (mndCheckDnodeMemory(pMnode, pOldDb, pNewDb, &newVgroup, pVgroup, pArray) != 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
mndTransSetSerial(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
if (newVgroup.replica < pDb->cfg.replications) {
|
if (newVgroup.replica < pNewDb->cfg.replications) {
|
||||||
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
|
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
|
||||||
pVgroup->vnodeGid[0].dnodeId);
|
pVgroup->vnodeGid[0].dnodeId);
|
||||||
|
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||||
|
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||||
} else if (newVgroup.replica > pDb->cfg.replications) {
|
} else if (newVgroup.replica > pNewDb->cfg.replications) {
|
||||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
||||||
|
|
||||||
SVnodeGid del1 = {0};
|
SVnodeGid del1 = {0};
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del1, true) != 0) return -1;
|
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del1, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del1, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||||
|
|
||||||
SVnodeGid del2 = {0};
|
SVnodeGid del2 = {0};
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
||||||
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pDb, pVgroup, &del2, true) != 0) return -1;
|
if (mndAddSetVnodeStandByAction(pMnode, pTrans, pNewDb, pVgroup, &del2, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pNewDb, &newVgroup, &del2, true) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pNewDb, &newVgroup) != 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1648,8 +1685,8 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg2, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg2, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
|
||||||
|
|
||||||
// adjust vgroup
|
// adjust vgroup
|
||||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, &newVg1, pArray) != 0) goto _OVER;
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
||||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, &newVg2, pArray) != 0) goto _OVER;
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
@ -1782,7 +1819,7 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
SDnodeObj *pDnode = taosArrayGet(pArray, i);
|
||||||
mInfo("dnode:%d, equivalent vnodes:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
|
mInfo("dnode:%d, equivalent vnodes:%d support:%d, score:%f", pDnode->id, pDnode->numOfVnodes,
|
||||||
pDnode->numOfSupportVnodes, (float)pDnode->numOfVnodes / pDnode->numOfSupportVnodes);
|
pDnode->numOfSupportVnodes, (float)pDnode->numOfVnodes / pDnode->numOfSupportVnodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
|
SDnodeObj *pSrc = taosArrayGet(pArray, taosArrayGetSize(pArray) - 1);
|
||||||
|
@ -1791,7 +1828,7 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
|
||||||
float srcScore = (float)(pSrc->numOfVnodes - 1) / pSrc->numOfSupportVnodes;
|
float srcScore = (float)(pSrc->numOfVnodes - 1) / pSrc->numOfSupportVnodes;
|
||||||
float dstScore = (float)(pDst->numOfVnodes + 1) / pDst->numOfSupportVnodes;
|
float dstScore = (float)(pDst->numOfVnodes + 1) / pDst->numOfSupportVnodes;
|
||||||
mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, srcScore,
|
mInfo("trans:%d, after balance, src dnode:%d score:%f, dst dnode:%d score:%f", pTrans->id, pSrc->id, srcScore,
|
||||||
pDst->id, dstScore);
|
pDst->id, dstScore);
|
||||||
|
|
||||||
if (srcScore > dstScore - 0.000001) {
|
if (srcScore > dstScore - 0.000001) {
|
||||||
code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
|
code = mndBalanceVgroupBetweenDnode(pMnode, pTrans, pSrc, pDst, pBalancedVgroups);
|
||||||
|
|
|
@ -1024,71 +1024,75 @@ static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, vo
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SAlterVnodeReq alterReq = {0};
|
SAlterVnodeReq req = {0};
|
||||||
bool walChanged = false;
|
bool walChanged = false;
|
||||||
bool tsdbChanged = false;
|
bool tsdbChanged = false;
|
||||||
|
|
||||||
if (tDeserializeSAlterVnodeReq(pReq, len, &alterReq) != 0) {
|
if (tDeserializeSAlterVnodeReq(pReq, len, &req) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
vInfo("vgId:%d, start to alter vnode config, cacheLast:%d cacheLastSize:%d", TD_VID(pVnode), alterReq.cacheLast,
|
vInfo("vgId:%d, start to alter vnode config, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64
|
||||||
alterReq.cacheLastSize);
|
" cacheLast:%d cacheLastSize:%d days:%d keep0:%d keep1:%d keep2:%d fsync:%d level:%d strict:%d",
|
||||||
if (pVnode->config.cacheLastSize != alterReq.cacheLastSize) {
|
TD_VID(pVnode), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, (uint64_t)req.buffer * 1024 * 1024,
|
||||||
pVnode->config.cacheLastSize = alterReq.cacheLastSize;
|
req.cacheLast, req.cacheLastSize, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
|
||||||
|
req.walFsyncPeriod, req.walLevel, req.strict);
|
||||||
|
|
||||||
|
if (pVnode->config.cacheLastSize != req.cacheLastSize) {
|
||||||
|
pVnode->config.cacheLastSize = req.cacheLastSize;
|
||||||
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
tsdbCacheSetCapacity(pVnode, (size_t)pVnode->config.cacheLastSize * 1024 * 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.szBuf != alterReq.buffer * 1024LL * 1024LL) {
|
if (pVnode->config.szBuf != req.buffer * 1024LL * 1024LL) {
|
||||||
vInfo("vgId:%d vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
|
vInfo("vgId:%d vnode buffer is changed from %" PRId64 " to %" PRId64, TD_VID(pVnode), pVnode->config.szBuf,
|
||||||
alterReq.buffer * 1024LL * 1024LL);
|
req.buffer * 1024LL * 1024LL);
|
||||||
pVnode->config.szBuf = alterReq.buffer * 1024LL * 1024LL;
|
pVnode->config.szBuf = req.buffer * 1024LL * 1024LL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.szCache != alterReq.pages) {
|
if (pVnode->config.szCache != req.pages) {
|
||||||
if (metaAlterCache(pVnode->pMeta, alterReq.pages) < 0) {
|
if (metaAlterCache(pVnode->pMeta, req.pages) < 0) {
|
||||||
vError("vgId:%d failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
|
vError("vgId:%d failed to change vnode pages from %d to %d failed since %s", TD_VID(pVnode),
|
||||||
pVnode->config.szCache, alterReq.pages, tstrerror(errno));
|
pVnode->config.szCache, req.pages, tstrerror(errno));
|
||||||
return errno;
|
return errno;
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, alterReq.pages);
|
vInfo("vgId:%d vnode pages is changed from %d to %d", TD_VID(pVnode), pVnode->config.szCache, req.pages);
|
||||||
pVnode->config.szCache = alterReq.pages;
|
pVnode->config.szCache = req.pages;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.cacheLast != alterReq.cacheLast) {
|
if (pVnode->config.cacheLast != req.cacheLast) {
|
||||||
pVnode->config.cacheLast = alterReq.cacheLast;
|
pVnode->config.cacheLast = req.cacheLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.walCfg.fsyncPeriod != alterReq.walFsyncPeriod) {
|
if (pVnode->config.walCfg.fsyncPeriod != req.walFsyncPeriod) {
|
||||||
pVnode->config.walCfg.fsyncPeriod = alterReq.walFsyncPeriod;
|
pVnode->config.walCfg.fsyncPeriod = req.walFsyncPeriod;
|
||||||
|
|
||||||
walChanged = true;
|
walChanged = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.walCfg.level != alterReq.walLevel) {
|
if (pVnode->config.walCfg.level != req.walLevel) {
|
||||||
pVnode->config.walCfg.level = alterReq.walLevel;
|
pVnode->config.walCfg.level = req.walLevel;
|
||||||
|
|
||||||
walChanged = true;
|
walChanged = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.tsdbCfg.keep0 != alterReq.daysToKeep0) {
|
if (pVnode->config.tsdbCfg.keep0 != req.daysToKeep0) {
|
||||||
pVnode->config.tsdbCfg.keep0 = alterReq.daysToKeep0;
|
pVnode->config.tsdbCfg.keep0 = req.daysToKeep0;
|
||||||
if (!VND_IS_RSMA(pVnode)) {
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
tsdbChanged = true;
|
tsdbChanged = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.tsdbCfg.keep1 != alterReq.daysToKeep1) {
|
if (pVnode->config.tsdbCfg.keep1 != req.daysToKeep1) {
|
||||||
pVnode->config.tsdbCfg.keep1 = alterReq.daysToKeep1;
|
pVnode->config.tsdbCfg.keep1 = req.daysToKeep1;
|
||||||
if (!VND_IS_RSMA(pVnode)) {
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
tsdbChanged = true;
|
tsdbChanged = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->config.tsdbCfg.keep2 != alterReq.daysToKeep2) {
|
if (pVnode->config.tsdbCfg.keep2 != req.daysToKeep2) {
|
||||||
pVnode->config.tsdbCfg.keep2 = alterReq.daysToKeep2;
|
pVnode->config.tsdbCfg.keep2 = req.daysToKeep2;
|
||||||
if (!VND_IS_RSMA(pVnode)) {
|
if (!VND_IS_RSMA(pVnode)) {
|
||||||
tsdbChanged = true;
|
tsdbChanged = true;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue