diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a4e430b9a8..1105e0c52b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5133,8 +5133,10 @@ int32_t tSerializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryComp } int32_t tDeserializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryCompactProgressReq* pReq){ + int32_t headLen = sizeof(SMsgHead); + SDecoder decoder = {0}; - tDecoderInit(&decoder, buf, bufLen); + tDecoderInit(&decoder, buf + headLen, bufLen - headLen); if (tStartDecode(&decoder) < 0) return -1; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a535ab17d7..9438f953a9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -818,6 +818,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 4036200d73..b358efe691 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -120,6 +120,7 @@ void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); +int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index e9dbc5e659..7377850cf6 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -374,6 +374,15 @@ _exit: return code; } +int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg){ + SQueryCompactProgressReq req; + + if (tDeserializeSQueryCompactProgressReq(pMsg->pCont, pMsg->contLen, &req)) { + + } + return 0; +} + int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { SSyncState state = syncGetState(pVnode->sync); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5c3f44461c..1035c61a97 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -44,6 +44,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pRe static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) { int32_t code = 0; @@ -626,6 +627,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_COMPACT: vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp); goto _exit; + case TDMT_VND_KILL_COMPACT: + vnodeProcessKillCompactReq(pVnode, ver, pReq, len, pRsp); + break; case TDMT_SYNC_CONFIG_CHANGE: vnodeProcessConfigChangeReq(pVnode, ver, pReq, len, pRsp); break; @@ -743,6 +747,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); + case TDMT_VND_QUERY_COMPACT_PROGRESS: + return vnodeQueryCompactProgress(pVnode, pMsg); // case TDMT_VND_TMQ_CONSUME: // return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: @@ -2050,6 +2056,10 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp); } +static int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp){ + return 0; +} + static int32_t vnodeProcessStopCompactReq(SVnode *pVnode) { // TODO return 0;