enh: confirm alter hash range
This commit is contained in:
parent
67eac1a76f
commit
5d63f438b5
|
@ -225,7 +225,6 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL)
|
||||||
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL)
|
||||||
|
|
||||||
|
|
|
@ -7691,4 +7691,4 @@ void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) {
|
||||||
taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
||||||
pSubTopicEp->schema.nCols = 0;
|
pSubTopicEp->schema.nCols = 0;
|
||||||
taosArrayDestroy(pSubTopicEp->vgs);
|
taosArrayDestroy(pSubTopicEp->vgs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2164,6 +2164,7 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
||||||
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// alter hash range
|
||||||
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
|
||||||
newVg1.vgId = maxVgId;
|
newVg1.vgId = maxVgId;
|
||||||
|
@ -2172,6 +2173,10 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
|
||||||
newVg2.vgId = maxVgId;
|
newVg2.vgId = maxVgId;
|
||||||
|
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
||||||
|
|
||||||
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
|
||||||
|
|
||||||
// adjust vgroup replica
|
// adjust vgroup replica
|
||||||
if (pDb->cfg.replications != newVg1.replica) {
|
if (pDb->cfg.replications != newVg1.replica) {
|
||||||
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
if (mndBuildAlterVgroupAction(pMnode, pTrans, pDb, pDb, &newVg1, pArray) != 0) goto _OVER;
|
||||||
|
|
|
@ -343,6 +343,7 @@ struct SVnodeCfg {
|
||||||
SVnodeStats vndStats;
|
SVnodeStats vndStats;
|
||||||
uint32_t hashBegin;
|
uint32_t hashBegin;
|
||||||
uint32_t hashEnd;
|
uint32_t hashEnd;
|
||||||
|
bool hashChange;
|
||||||
int16_t sttTrigger;
|
int16_t sttTrigger;
|
||||||
int16_t hashPrefix;
|
int16_t hashPrefix;
|
||||||
int16_t hashSuffix;
|
int16_t hashSuffix;
|
||||||
|
|
|
@ -134,6 +134,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "hashChange", pCfg->hashChange) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashPrefix", pCfg->hashPrefix) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashSuffix", pCfg->hashSuffix) < 0) return -1;
|
||||||
|
@ -249,6 +250,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
|
tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
|
tjsonGetNumberValue(pJson, "hashChange", pCfg->hashChange, code);
|
||||||
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
|
tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) return -1;
|
||||||
tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code);
|
tjsonGetNumberValue(pJson, "hashPrefix", pCfg->hashPrefix, code);
|
||||||
|
|
|
@ -198,6 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
|
||||||
info.config.vgId = pReq->dstVgId;
|
info.config.vgId = pReq->dstVgId;
|
||||||
info.config.hashBegin = pReq->hashBegin;
|
info.config.hashBegin = pReq->hashBegin;
|
||||||
info.config.hashEnd = pReq->hashEnd;
|
info.config.hashEnd = pReq->hashEnd;
|
||||||
|
info.config.hashChange = true;
|
||||||
info.config.walCfg.vgId = pReq->dstVgId;
|
info.config.walCfg.vgId = pReq->dstVgId;
|
||||||
|
|
||||||
SSyncCfg *pCfg = &info.config.syncCfg;
|
SSyncCfg *pCfg = &info.config.syncCfg;
|
||||||
|
|
|
@ -1453,11 +1453,30 @@ int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
|
||||||
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
|
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeConsolidateAlterHashRange(SVnode *pVnode, int64_t version) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
vInfo("vgId:%d, trim meta of tables per hash range [%" PRIu32 ", %" PRIu32 "]. apply-index:%" PRId64, TD_VID(pVnode),
|
||||||
|
pVnode->config.hashBegin, pVnode->config.hashEnd, version);
|
||||||
|
|
||||||
|
// TODO: trim meta of tables from TDB per hash range [pVnode->config.hashBegin, pVnode->config.hashEnd]
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
vInfo("vgId:%d, vnode management handle msgType:alter-confirm, alter replica confim msg is processed",
|
vInfo("vgId:%d, vnode handle msgType:alter-confirm, alter confim msg is processed", TD_VID(pVnode));
|
||||||
TD_VID(pVnode));
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (!pVnode->config.hashChange) {
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = vnodeConsolidateAlterHashRange(pVnode, version);
|
||||||
|
pVnode->config.hashChange = false;
|
||||||
|
|
||||||
|
_exit:
|
||||||
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
|
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
|
||||||
pRsp->code = TSDB_CODE_SUCCESS;
|
pRsp->code = code;
|
||||||
pRsp->pCont = NULL;
|
pRsp->pCont = NULL;
|
||||||
pRsp->contLen = 0;
|
pRsp->contLen = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue