enh: add mndAddVgStatusAction and mndAddDbStatusAction

This commit is contained in:
Benguang Zhao 2023-06-01 17:58:27 +08:00
parent 28efd7dcd7
commit 2ef2715ee1
1 changed files with 43 additions and 24 deletions

View File

@ -2250,10 +2250,13 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans,
return 0; return 0;
} }
static int32_t mndTransCommitVgStatus(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus) { typedef int32_t (*FpTransActionCb)(STrans *pTrans, SSdbRaw *pRaw);
static int32_t mndAddVgStatusAction(STrans *pTrans, SVgObj *pVg, ESdbStatus vgStatus, ETrnStage stage) {
FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
SSdbRaw *pRaw = mndVgroupActionEncode(pVg); SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err; if (pRaw == NULL) goto _err;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _err; if (appendActionCb(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, vgStatus); (void)sdbSetRawStatus(pRaw, vgStatus);
pRaw = NULL; pRaw = NULL;
return 0; return 0;
@ -2262,10 +2265,22 @@ _err:
return -1; return -1;
} }
static int32_t mndAddDbStatusAction(STrans *pTrans, SDbObj *pDb, ESdbStatus dbStatus, ETrnStage stage) {
FpTransActionCb appendActionCb = (stage == TRN_STAGE_COMMIT_ACTION) ? mndTransAppendCommitlog : mndTransAppendRedolog;
SSdbRaw *pRaw = mndDbActionEncode(pDb);
if (pRaw == NULL) goto _err;
if (appendActionCb(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, dbStatus);
pRaw = NULL;
return 0;
_err:
sdbFreeRaw(pRaw);
return -1;
}
int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = NULL; STrans *pTrans = NULL;
SSdbRaw *pRaw = NULL;
SDbObj dbObj = {0}; SDbObj dbObj = {0};
SArray *pArray = mndBuildDnodesArray(pMnode, 0); SArray *pArray = mndBuildDnodesArray(pMnode, 0);
@ -2339,24 +2354,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
// adjust vgroup replica if (mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (pDb->cfg.replications != newVg1.replica) { if (mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER; if (mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
} else {
if (mndTransCommitVgStatus(pTrans, &newVg1, SDB_STATUS_READY) < 0) goto _OVER;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else {
if (mndTransCommitVgStatus(pTrans, &newVg2, SDB_STATUS_READY) < 0) goto _OVER;
}
if (mndTransCommitVgStatus(pTrans, pVgroup, SDB_STATUS_DROPPED) < 0) goto _OVER;
// update db status
memcpy(&dbObj, pDb, sizeof(SDbObj)); memcpy(&dbObj, pDb, sizeof(SDbObj));
if (dbObj.cfg.pRetensions != NULL) { if (dbObj.cfg.pRetensions != NULL) {
dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL); dbObj.cfg.pRetensions = taosArrayDup(pDb->cfg.pRetensions, NULL);
@ -2365,11 +2369,27 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
dbObj.vgVersion++; dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs(); dbObj.updateTime = taosGetTimestampMs();
dbObj.cfg.numOfVgroups++; dbObj.cfg.numOfVgroups++;
pRaw = mndDbActionEncode(&dbObj); if (mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_REDO_ACTION) < 0) goto _OVER;
if (pRaw == NULL) goto _OVER;
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER; // adjust vgroup replica
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); if (pDb->cfg.replications != newVg1.replica) {
pRaw = NULL; if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
} else {
if (mndAddVgStatusAction(pTrans, &newVg1, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
}
if (pDb->cfg.replications != newVg2.replica) {
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
} else {
if (mndAddVgStatusAction(pTrans, &newVg2, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
}
if (mndAddVgStatusAction(pTrans, pVgroup, SDB_STATUS_DROPPED, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
// commit db status
dbObj.vgVersion++;
dbObj.updateTime = taosGetTimestampMs();
if (mndAddDbStatusAction(pTrans, &dbObj, SDB_STATUS_READY, TRN_STAGE_COMMIT_ACTION) < 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
@ -2377,7 +2397,6 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
_OVER: _OVER:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
mndTransDrop(pTrans); mndTransDrop(pTrans);
sdbFreeRaw(pRaw);
taosArrayDestroy(dbObj.cfg.pRetensions); taosArrayDestroy(dbObj.cfg.pRetensions);
return code; return code;
} }