commit
f974d625b6
|
@ -194,6 +194,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_QND_MSG)
|
TD_NEW_MSG_SEG(TDMT_QND_MSG)
|
||||||
|
|
|
@ -215,6 +215,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -359,6 +359,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -41,6 +41,7 @@ int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pV
|
||||||
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
int32_t mndAddDropVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool isRedo);
|
||||||
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
|
int32_t mndSetMoveVgroupInfoToTrans(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t vn, SArray *pArray);
|
||||||
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
|
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *, STrans *pTrans, int32_t dropDnodeId);
|
||||||
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray);
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
|
void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *cntlen, bool standby);
|
||||||
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
|
|
|
@ -636,57 +636,6 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
|
||||||
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
SVgObj newVgroup = {0};
|
|
||||||
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
|
||||||
mndTransSetSerial(pTrans);
|
|
||||||
|
|
||||||
if (newVgroup.replica < pDb->cfg.replications) {
|
|
||||||
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
|
|
||||||
pVgroup->vnodeGid[0].dnodeId);
|
|
||||||
|
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
|
||||||
|
|
||||||
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
|
||||||
} else {
|
|
||||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
|
||||||
|
|
||||||
SVnodeGid del1 = {0};
|
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
|
||||||
|
|
||||||
SVnodeGid del2 = {0};
|
|
||||||
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
|
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
|
||||||
if (pVgRaw == NULL) return -1;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
|
|
||||||
sdbFreeRaw(pVgRaw);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
|
|
@ -55,6 +55,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_HASHRANGE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
|
@ -1166,7 +1167,155 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) { return 0; }
|
int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
||||||
|
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
SVgObj newVgroup = {0};
|
||||||
|
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
|
||||||
|
if (newVgroup.replica < pDb->cfg.replications) {
|
||||||
|
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
|
||||||
|
pVgroup->vnodeGid[0].dnodeId);
|
||||||
|
|
||||||
|
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
||||||
|
|
||||||
|
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
||||||
|
} else if (newVgroup.replica > pDb->cfg.replications) {
|
||||||
|
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
||||||
|
|
||||||
|
SVnodeGid del1 = {0};
|
||||||
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
||||||
|
|
||||||
|
SVnodeGid del2 = {0};
|
||||||
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
|
||||||
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
|
||||||
|
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||||
|
if (pVgRaw == NULL) return -1;
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
|
||||||
|
sdbFreeRaw(pVgRaw);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SSdbRaw *pRaw = NULL;
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||||
|
|
||||||
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq);
|
||||||
|
if (pTrans == NULL) goto _OVER;
|
||||||
|
mndTransSetSerial(pTrans);
|
||||||
|
mDebug("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
|
||||||
|
|
||||||
|
SVgObj newVg1 = {0};
|
||||||
|
memcpy(&newVg1, pVgroup, sizeof(SVgObj));
|
||||||
|
mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
|
||||||
|
newVg1.hashBegin, newVg1.hashEnd);
|
||||||
|
for (int32_t i = 0; i < newVg1.replica; ++i) {
|
||||||
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newVg1.replica == 1) {
|
||||||
|
if (mndAddVnodeToVgroup(pMnode, &newVg1, pArray) != 0) goto _OVER;
|
||||||
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg1, &newVg1.vnodeGid[1], true) != 0) goto _OVER;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
|
} else if (newVg1.replica == 3) {
|
||||||
|
SVnodeGid del1 = {0};
|
||||||
|
if (mndRemoveVnodeFromVgroup(pMnode, &newVg1, pArray, &del1) != 0) goto _OVER;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_REPLICA) != 0) goto _OVER;
|
||||||
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVg1, &del1, true) != 0) goto _OVER;
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
|
} else {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVgObj newVg2 = {0};
|
||||||
|
memcpy(&newVg1, &newVg2, sizeof(SVgObj));
|
||||||
|
newVg1.replica = 1;
|
||||||
|
newVg1.hashEnd = (newVg1.hashBegin + newVg1.hashEnd) / 2;
|
||||||
|
memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
|
||||||
|
|
||||||
|
newVg2.replica = 1;
|
||||||
|
newVg2.hashBegin = newVg1.hashEnd + 1;
|
||||||
|
memcpy(&newVg2.vnodeGid[0], &newVg2.vnodeGid[1], sizeof(SVnodeGid));
|
||||||
|
memset(&newVg1.vnodeGid[1], 0, sizeof(SVnodeGid));
|
||||||
|
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg1, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
|
||||||
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVg2, TDMT_VND_ALTER_HASHRANGE) != 0) goto _OVER;
|
||||||
|
|
||||||
|
// adjust vgroup
|
||||||
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, &newVg1, pArray) != 0) goto _OVER;
|
||||||
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, &newVg2, pArray) != 0) goto _OVER;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
sdbFreeRaw(pRaw);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
int32_t code = -1;
|
||||||
|
int32_t vgId = 2;
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
|
||||||
|
mDebug("vgId:%d, start to split", vgId);
|
||||||
|
|
||||||
|
pVgroup = mndAcquireVgroup(pMnode, vgId);
|
||||||
|
if (pVgroup == NULL) goto _OVER;
|
||||||
|
|
||||||
|
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||||
|
if (pDb == NULL) goto _OVER;
|
||||||
|
|
||||||
|
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckNodeAuth(pUser) != 0) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
|
||||||
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
static int32_t mndSetBalanceVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup,
|
||||||
SDnodeObj *pSrc, SDnodeObj *pDst) {
|
SDnodeObj *pSrc, SDnodeObj *pDst) {
|
||||||
|
|
|
@ -24,6 +24,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
|
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
|
||||||
|
|
||||||
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
@ -163,6 +164,9 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
||||||
break;
|
break;
|
||||||
|
case TDMT_VND_ALTER_HASHRANGE:
|
||||||
|
vnodeProcessAlterHasnRangeReq(pVnode, version, pReq, len, pRsp);
|
||||||
|
break;
|
||||||
case TDMT_VND_ALTER_CONFIG:
|
case TDMT_VND_ALTER_CONFIG:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -889,3 +893,13 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
|
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
|
||||||
|
|
||||||
|
// todo
|
||||||
|
// 1. stop work
|
||||||
|
// 2. adjust hash range / compact / remove wals / rename vgroups
|
||||||
|
// 3. reload sync
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue