fix: alter db replications
This commit is contained in:
parent
1d09cc1043
commit
666b0b676d
|
@ -293,14 +293,10 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_WRITE_EXEC, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_ALTER_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_COMPACT_VNODE, vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||||
// dmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
|
||||||
// dmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, DEFAULT_HANDLE);
|
|
||||||
|
|
||||||
// sync integration
|
// sync integration
|
||||||
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
dmSetMsgHandle(pWrapper, TDMT_VND_SYNC_TIMEOUT, (NodeMsgFp)vmProcessSyncMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -627,6 +627,7 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
|
||||||
|
|
||||||
if (pAlter->replications >= 0 && pAlter->replications != pDb->cfg.replications) {
|
if (pAlter->replications >= 0 && pAlter->replications != pDb->cfg.replications) {
|
||||||
pDb->cfg.replications = pAlter->replications;
|
pDb->cfg.replications = pAlter->replications;
|
||||||
|
pDb->vgVersion++;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -652,6 +653,7 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
|
if (pVgroup->replica == pDb->cfg.replications) {
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||||
|
@ -673,6 +675,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (pVgroup->replica < pDb->cfg.replications) {
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -726,6 +731,7 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
SUserObj *pUser = NULL;
|
SUserObj *pUser = NULL;
|
||||||
SAlterDbReq alterReq = {0};
|
SAlterDbReq alterReq = {0};
|
||||||
|
SDbObj dbObj = {0};
|
||||||
|
|
||||||
if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
|
if (tDeserializeSAlterDbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -749,15 +755,14 @@ static int32_t mndProcessAlterDbReq(SNodeMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDbObj dbObj = {0};
|
|
||||||
memcpy(&dbObj, pDb, sizeof(SDbObj));
|
memcpy(&dbObj, pDb, sizeof(SDbObj));
|
||||||
dbObj.cfg.numOfRetensions = 0;
|
if (dbObj.cfg.pRetensions != NULL) {
|
||||||
dbObj.cfg.pRetensions = NULL;
|
dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions);
|
||||||
|
if (dbObj.cfg.pRetensions == NULL) goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq);
|
code = mndSetDbCfgFromAlterDbReq(&dbObj, &alterReq);
|
||||||
if (code != 0) {
|
if (code != 0) goto _OVER;
|
||||||
goto _OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dbObj.cfgVersion++;
|
dbObj.cfgVersion++;
|
||||||
dbObj.updateTime = taosGetTimestampMs();
|
dbObj.updateTime = taosGetTimestampMs();
|
||||||
|
@ -771,6 +776,7 @@ _OVER:
|
||||||
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
taosArrayDestroy(dbObj.cfg.pRetensions);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue