enh: add alter vnode hashrange request
This commit is contained in:
parent
03dc6e55b5
commit
8bc0d4ba74
|
@ -1287,6 +1287,17 @@ typedef struct {
|
||||||
int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
|
int32_t tSerializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
|
||||||
int32_t tDeserializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
|
int32_t tDeserializeSDisableVnodeWriteReq(void* buf, int32_t bufLen, SDisableVnodeWriteReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t srcVgId;
|
||||||
|
int32_t dstVgId;
|
||||||
|
uint32_t hashBegin;
|
||||||
|
uint32_t hashEnd;
|
||||||
|
int64_t reserved;
|
||||||
|
} SAlterVnodeHashRangeReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);
|
||||||
|
int32_t tDeserializeSAlterVnodeHashRangeReq(void* buf, int32_t bufLen, SAlterVnodeHashRangeReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
|
|
@ -4146,6 +4146,40 @@ int32_t tDeserializeSDisableVnodeWriteReq(void *buf, int32_t bufLen, SDisableVno
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->srcVgId) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->dstVgId) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->hashBegin) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->hashEnd) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->reserved) < 0) return -1;
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSAlterVnodeHashRangeReq(void *buf, int32_t bufLen, SAlterVnodeHashRangeReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->srcVgId) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->dstVgId) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->hashBegin) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->hashEnd) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->reserved) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
|
int32_t tSerializeSKillQueryReq(void *buf, int32_t bufLen, SKillQueryReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
|
@ -87,8 +87,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
SArray *vmGetMsgHandles();
|
SArray *vmGetMsgHandles();
|
||||||
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// vmFile.c
|
// vmFile.c
|
||||||
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
|
||||||
|
|
|
@ -302,7 +302,20 @@ int32_t vmProcessDisableVnodeWriteReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
SAlterVnodeHashRangeReq req = {0};
|
||||||
|
if (tDeserializeSAlterVnodeHashRangeReq(pMsg->pCont, pMsg->contLen, &req) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dInfo("vgId:%d, alter hashrange msg will be processed, dstVgId:%d, begin:%u, end:%u", req.srcVgId, req.dstVgId,
|
||||||
|
req.hashBegin, req.hashEnd);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
SAlterVnodeReplicaReq alterReq = {0};
|
SAlterVnodeReplicaReq alterReq = {0};
|
||||||
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
|
if (tDeserializeSAlterVnodeReplicaReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -473,7 +486,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DISABLE_WRITE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -41,11 +41,14 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
code = vmProcessDropVnodeReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_ALTER_REPLICA:
|
case TDMT_VND_ALTER_REPLICA:
|
||||||
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
|
code = vmProcessAlterVnodeReplicaReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_DISABLE_WRITE:
|
case TDMT_VND_DISABLE_WRITE:
|
||||||
code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg);
|
code = vmProcessDisableVnodeWriteReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
|
case TDMT_VND_ALTER_HASHRANGE:
|
||||||
|
code = vmProcessAlterHashRangeReq(pMgmt, pMsg);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
dGError("msg:%p, not processed in vnode-mgmt queue", pMsg);
|
||||||
|
|
|
@ -420,6 +420,33 @@ static void *mndBuildDisableVnodeWriteReq(SMnode *pMnode, SDbObj *pDb, int32_t v
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *mndBuildAlterVnodeHashRangeReq(SMnode *pMnode, SVgObj *pVgroup, int32_t dstVgId, int32_t *pContLen) {
|
||||||
|
SAlterVnodeHashRangeReq alterReq = {
|
||||||
|
.srcVgId = pVgroup->vgId,
|
||||||
|
.dstVgId = dstVgId,
|
||||||
|
.hashBegin = pVgroup->hashBegin,
|
||||||
|
.hashEnd = pVgroup->hashEnd,
|
||||||
|
};
|
||||||
|
|
||||||
|
mInfo("vgId:%d, build alter vnode hashrange req, dstVgId:%d, begin:%u, end:%u", pVgroup->vgId, dstVgId,
|
||||||
|
pVgroup->hashBegin, pVgroup->hashEnd);
|
||||||
|
int32_t contLen = tSerializeSAlterVnodeHashRangeReq(NULL, 0, &alterReq);
|
||||||
|
if (contLen < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
if (pReq == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSAlterVnodeHashRangeReq(pReq, contLen, &alterReq);
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
||||||
SDropVnodeReq dropReq = {0};
|
SDropVnodeReq dropReq = {0};
|
||||||
dropReq.dnodeId = pDnode->id;
|
dropReq.dnodeId = pDnode->id;
|
||||||
|
@ -1077,7 +1104,25 @@ int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { return 0; }
|
static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t dstVgId) {
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void *pReq = mndBuildAlterVnodeHashRangeReq(pMnode, pVgroup, dstVgId, &contLen);
|
||||||
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
|
action.pCont = pReq;
|
||||||
|
action.contLen = contLen;
|
||||||
|
action.msgType = TDMT_VND_ALTER_HASHRANGE;
|
||||||
|
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
@ -1854,12 +1899,6 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
||||||
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg1.vgId, i, newVg1.vnodeGid[i].dnodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRaw = mndVgroupActionEncode(pVgroup);
|
|
||||||
if (pRaw == NULL) goto _OVER;
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) goto _OVER;
|
|
||||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_DROPPING);
|
|
||||||
pRaw = NULL;
|
|
||||||
|
|
||||||
SVgObj newVg2 = {0};
|
SVgObj newVg2 = {0};
|
||||||
memcpy(&newVg2, &newVg1, sizeof(SVgObj));
|
memcpy(&newVg2, &newVg1, sizeof(SVgObj));
|
||||||
newVg1.replica = 1;
|
newVg1.replica = 1;
|
||||||
|
@ -1882,8 +1921,13 @@ static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj
|
||||||
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
|
mInfo("vgId:%d, vnode:%d dnode:%d", newVg2.vgId, i, newVg2.vnodeGid[i].dnodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
|
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
|
||||||
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, pDb, &newVg2) != 0) goto _OVER;
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg1, maxVgId) != 0) goto _OVER;
|
||||||
|
newVg1.vgId = maxVgId;
|
||||||
|
|
||||||
|
maxVgId++;
|
||||||
|
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, &newVg2, maxVgId) != 0) goto _OVER;
|
||||||
|
newVg2.vgId = maxVgId;
|
||||||
|
|
||||||
// adjust vgroup replica
|
// adjust vgroup replica
|
||||||
if (pDb->cfg.replications != newVg1.replica) {
|
if (pDb->cfg.replications != newVg1.replica) {
|
||||||
|
|
|
@ -24,7 +24,6 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
|
||||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -313,9 +312,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_ALTER_HASHRANGE:
|
|
||||||
vnodeProcessAlterHashRangeReq(pVnode, version, pReq, len, pRsp);
|
|
||||||
break;
|
|
||||||
case TDMT_VND_ALTER_CONFIG:
|
case TDMT_VND_ALTER_CONFIG:
|
||||||
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
|
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
|
||||||
break;
|
break;
|
||||||
|
@ -1246,16 +1242,6 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterHashRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
|
||||||
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
|
|
||||||
|
|
||||||
// todo
|
|
||||||
// 1. stop work
|
|
||||||
// 2. adjust hash range / compact / remove wals / rename vgroups
|
|
||||||
// 3. reload sync
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
bool walChanged = false;
|
bool walChanged = false;
|
||||||
bool tsdbChanged = false;
|
bool tsdbChanged = false;
|
||||||
|
|
Loading…
Reference in New Issue