Merge pull request #21107 from taosdata/FIX/TD-18702-3.0
enh: support split vgroup
This commit is contained in:
commit
22f06ce27c
|
@ -225,7 +225,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
|
||||
|
||||
|
|
|
@ -7691,4 +7691,4 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) {
|
|||
taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
||||
pSubTopicEp->schema.nCols = 0;
|
||||
taosArrayDestroy(pSubTopicEp->vgs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2084,6 +2084,8 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb
|
|||
return -1;
|
||||
}
|
||||
|
||||
mndSortVnodeGid(&newVgroup);
|
||||
|
||||
{
|
||||
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
|
||||
if (pVgRaw == NULL) return -1;
|
||||
|
@ -2162,6 +2164,7 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
|||
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
|
||||
}
|
||||
|
||||
// alter hash range
|
||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
|
||||
newVg1.vgId = maxVgId;
|
||||
|
@ -2170,26 +2173,31 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
|||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
|
||||
newVg2.vgId = maxVgId;
|
||||
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
|
||||
|
||||
// adjust vgroup replica
|
||||
if (pDb->cfg.replications != newVg1.replica) {
|
||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
||||
} else {
|
||||
pRaw = mndVgroupActionEncode(&newVg1);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
pRaw = NULL;
|
||||
}
|
||||
|
||||
if (pDb->cfg.replications != newVg2.replica) {
|
||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg2, pArray) != 0) goto _OVER;
|
||||
} else {
|
||||
pRaw = mndVgroupActionEncode(&newVg2);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
pRaw = NULL;
|
||||
}
|
||||
|
||||
pRaw = mndVgroupActionEncode(&newVg1);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
pRaw = NULL;
|
||||
|
||||
pRaw = mndVgroupActionEncode(&newVg2);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
pRaw = NULL;
|
||||
|
||||
pRaw = mndVgroupActionEncode(pVgroup);
|
||||
if (pRaw == NULL) goto _OVER;
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
||||
|
@ -2245,7 +2253,12 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
|
|||
if (pDb == NULL) goto _OVER;
|
||||
|
||||
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
if (code != 0) {
|
||||
mError("vgId:%d, failed to start to split vgroup since %s, db:%s", pVgroup->vgId, terrstr(), pDb->name);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mInfo("vgId:%d, split vgroup started successfully. db:%s", pVgroup->vgId, pDb->name);
|
||||
|
||||
_OVER:
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
|
|
|
@ -343,6 +343,7 @@ struct SVnodeCfg {
|
|||
SVnodeStats vndStats;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
bool hashChange;
|
||||
int16_t sttTrigger;
|
||||
int16_t hashPrefix;
|
||||
int16_t hashSuffix;
|
||||
|
|
|
@ -134,6 +134,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
|
||||
|
@ -249,6 +250,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "hashChange", pCfg->hashChange, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code);
|
||||
|
|
|
@ -198,6 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
|||
info.config.vgId = pReq->dstVgId;
|
||||
info.config.hashBegin = pReq->hashBegin;
|
||||
info.config.hashEnd = pReq->hashEnd;
|
||||
info.config.hashChange = true;
|
||||
info.config.walCfg.vgId = pReq->dstVgId;
|
||||
|
||||
SSyncCfg *pCfg = &info.config.syncCfg;
|
||||
|
|
|
@ -1453,11 +1453,30 @@ int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
|
|||
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
|
||||
}
|
||||
|
||||
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t version) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
|
||||
pVnode->config.hashBegin, pVnode->config.hashEnd, version);
|
||||
|
||||
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||
vInfo("vgId:%d, vnode management handle msgType:alter-confirm, alter replica confim msg is processed",
|
||||
TD_VID(pVnode));
|
||||
vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!pVnode->config.hashChange) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
code = vnodeConsolidateAlterHashRange(pVnode, version);
|
||||
pVnode->config.hashChange = false;
|
||||
|
||||
_exit:
|
||||
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
pRsp->code = code;
|
||||
pRsp->pCont = NULL;
|
||||
pRsp->contLen = 0;
|
||||
|
||||
|
|
|
@ -93,6 +93,19 @@ endi
|
|||
print =============== step4: split
|
||||
print split vgroup 2
|
||||
sql split vgroup 2
|
||||
$wt = 0
|
||||
stepwt1:
|
||||
$wt = $wt + 1
|
||||
sleep 1000
|
||||
if $wt == 200 then
|
||||
print ====> split vgroup not completed!
|
||||
return -1
|
||||
endi
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
print wait 1 seconds to alter
|
||||
goto stepwt1
|
||||
endi
|
||||
|
||||
print =============== step5: check split result
|
||||
sql show d1.tables
|
||||
|
|
Loading…
Reference in New Issue