message handle

This commit is contained in:
dmchen 2023-12-07 11:42:32 +00:00
parent aee070918b
commit 348bef49ec
5 changed files with 25 additions and 1 deletions

View File

@ -5133,8 +5133,10 @@ int32_t tSerializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryComp
}
int32_t tDeserializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryCompactProgressReq* pReq){
int32_t headLen = sizeof(SMsgHead);
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
tDecoderInit(&decoder, buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;

View File

@ -818,6 +818,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -120,6 +120,7 @@ void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);

View File

@ -374,6 +374,15 @@ _exit:
return code;
}
int32_t vnodeQueryCompactProgress(SVnode *pVnode, SRpcMsg *pMsg){
SQueryCompactProgressReq req;
if (tDeserializeSQueryCompactProgressReq(pMsg->pCont, pMsg->contLen, &req)) {
}
return 0;
}
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
SSyncState state = syncGetState(pVnode->sync);

View File

@ -44,6 +44,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pRe
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
int32_t code = 0;
@ -626,6 +627,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_COMPACT:
vnodeProcessCompactVnodeReq(pVnode, ver, pReq, len, pRsp);
goto _exit;
case TDMT_VND_KILL_COMPACT:
vnodeProcessKillCompactReq(pVnode, ver, pReq, len, pRsp);
break;
case TDMT_SYNC_CONFIG_CHANGE:
vnodeProcessConfigChangeReq(pVnode, ver, pReq, len, pRsp);
break;
@ -743,6 +747,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_QUERY_COMPACT_PROGRESS:
return vnodeQueryCompactProgress(pVnode, pMsg);
// case TDMT_VND_TMQ_CONSUME:
// return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
@ -2050,6 +2056,10 @@ static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pR
return vnodeProcessCompactVnodeReqImpl(pVnode, ver, pReq, len, pRsp);
}
static int32_t vnodeProcessKillCompactReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp){
return 0;
}
static int32_t vnodeProcessStopCompactReq(SVnode *pVnode) {
// TODO
return 0;