diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 7833bdf139..b37c569867 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -220,6 +220,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "vnode-drop-ttl-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TRIM, "vnode-trim", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMMIT, "vnode-commit", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 16fe6c1b91..f3d7228e8a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -157,6 +157,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_INDEX, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; @@ -181,6 +183,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_HASHRANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CLIENT_REQUEST, mmPutMsgToSyncQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 3ce37a5f8e..1b53744c77 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -431,6 +431,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_INDEX, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 26a8ebb425..3e389ac2d5 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -75,6 +75,8 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIndexReq); + mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ec92316505..4f02c7f66b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -30,6 +30,8 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -323,6 +325,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp vnodeSyncCommit(pVnode); vnodeBegin(pVnode); goto _exit; + case TDMT_VND_CREATE_INDEX: + vnodeProcessCreateIndexReq(pVnode, version, pReq, len, pRsp); + break; + case TDMT_VND_DROP_INDEX: + vnodeProcessDropIndexReq(pVnode, version, pReq, len, pRsp); + break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return -1; @@ -1418,3 +1426,11 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq _err: return code; } +static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + // impl later + return TSDB_CODE_SUCCESS; +}