fix msg error
This commit is contained in:
parent
68fd418c73
commit
b77149240f
|
@ -67,23 +67,21 @@ enum {
|
|||
#endif
|
||||
// Requests handled by DNODE
|
||||
TD_NEW_MSG_SEG(TDMT_DND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE, "dnode-alter-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_VNODE, "dnode-create-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE, "dnode-alter-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_DROP_VNODE, "dnode-drop-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_AUTH_VNODE, "dnode-auth-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_MNODE, "dnode-create-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_MNODE, "dnode-alter-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_DROP_MNODE, "dnode-drop-mnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_DND_NETWORK_TEST, "dnode-nettest", NULL, NULL)
|
||||
|
||||
// Requests handled by MNODE
|
||||
TD_NEW_MSG_SEG(TDMT_MND_MSG)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "mnode-connect", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TABLE, "mnode-create-table", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TABLE, "mnode-drop-table", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_ACCT, "mnode-create-acct", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_ACCT, "mnode-alter-acct", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_ACCT, "mnode-drop-acct", NULL, NULL)
|
||||
|
@ -107,6 +105,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "mnode-create-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "mnode-alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "mnode-drop-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STB_META, "mnode-stb-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "mnode-vgroup-list", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "mnode-kill-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
|
||||
|
@ -126,10 +125,15 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "vnode-submit", SSubmitReq, SSubmitRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "vnode-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "vnode-fetch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "vnode-create-table", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "vnode-alter-table", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TABLE, "vnode-drop-table", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "vnode-update-tag-val", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateTbReq, SVCreateTbRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONSUME, "vnode-mq-consume", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
|
||||
|
@ -139,9 +143,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_TASK, "vnode-cancel-task", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TASK, "vnode-drop-task", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-super-table", SVCreateTbReq, SVCreateTbRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||
|
|
|
@ -167,7 +167,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
|||
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
|
||||
if (pDcl->msgType == TDMT_MND_CREATE_TABLE) {
|
||||
if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
|
||||
struct SCatalog* pCatalog = NULL;
|
||||
|
||||
char buf[18] = {0};
|
||||
|
|
|
@ -285,6 +285,6 @@ void initMsgHandleFp() {
|
|||
handleRequestRspFp[TDMT_MND_SHOW_RETRIEVE] = processRetrieveMnodeRsp;
|
||||
handleRequestRspFp[TDMT_MND_CREATE_DB] = processCreateDbRsp;
|
||||
handleRequestRspFp[TDMT_MND_USE_DB] = processUseDbRsp;
|
||||
handleRequestRspFp[TDMT_MND_CREATE_TABLE] = processCreateTableRsp;
|
||||
handleRequestRspFp[TDMT_MND_CREATE_STB] = processCreateTableRsp;
|
||||
handleRequestRspFp[TDMT_MND_DROP_DB] = processDropDbRsp;
|
||||
}
|
|
@ -30,27 +30,30 @@
|
|||
#define INTERNAL_SECRET "_secret"
|
||||
|
||||
static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||
// msg from client to dnode
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg;
|
||||
// Requests handled by DNODE
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg;
|
||||
|
||||
// msg from client to mnode
|
||||
// Requests handled by MNODE
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = dndProcessMnodeWriteMsg;
|
||||
|
@ -75,55 +78,51 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
|||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STB_META)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||
|
||||
// message from client to dnode
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg;
|
||||
|
||||
// message from mnode to vnode
|
||||
// Requests handled by VNODE
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = dndProcessMnodeWriteMsg;
|
||||
|
||||
// message from mnode to dnode
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg;
|
||||
|
||||
// message from dnode to mnode
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
|
||||
}
|
||||
|
||||
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
|
@ -210,7 +209,8 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
if (pMsg->pCont == NULL) {
|
||||
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle);
|
||||
dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType),
|
||||
pMsg->ahandle);
|
||||
SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN};
|
||||
rpcSendResponse(&rspMsg);
|
||||
return;
|
||||
|
|
|
@ -42,7 +42,7 @@ TEST_F(DndTestProfile, 01_ConnectMsg) {
|
|||
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont;
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
pRsp->acctId = htonl(pRsp->acctId);
|
||||
pRsp->clusterId = htonl(pRsp->clusterId);
|
||||
pRsp->clusterId = htobe64(pRsp->clusterId);
|
||||
pRsp->connId = htonl(pRsp->connId);
|
||||
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
|
||||
|
||||
|
@ -174,7 +174,7 @@ TEST_F(DndTestProfile, 05_KillConnMsg) {
|
|||
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont;
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
pRsp->acctId = htonl(pRsp->acctId);
|
||||
pRsp->clusterId = htonl(pRsp->clusterId);
|
||||
pRsp->clusterId = htobe64(pRsp->clusterId);
|
||||
pRsp->connId = htonl(pRsp->connId);
|
||||
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
|
||||
|
||||
|
|
|
@ -133,7 +133,7 @@ TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
STableInfoMsg* pReq = (STableInfoMsg*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->tableFname, "1.d1.stb");
|
||||
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_VND_TABLE_META, pReq, contLen);
|
||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_STB_META, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, 0);
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t mndInitStb(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessCreateStbInRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessAlterStbInRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessDropStbInRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_TABLE_META, mndProcessStbMetaMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaMsg);
|
||||
|
||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STB, mndGetStbMeta);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
|
||||
|
@ -886,14 +886,19 @@ static void mndExtractTableName(char *tableId, char *name) {
|
|||
}
|
||||
|
||||
static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) {
|
||||
SMnode * pMnode = pMsg->pMnode;
|
||||
SSdb * pSdb = pMnode->pSdb;
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SStbObj *pStb = NULL;
|
||||
int32_t cols = 0;
|
||||
char * pWrite;
|
||||
char *pWrite;
|
||||
char prefix[64] = {0};
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db);
|
||||
if (pDb == NULL) {
|
||||
return TSDB_CODE_MND_INVALID_DB;
|
||||
}
|
||||
|
||||
tstrncpy(prefix, pShow->db, 64);
|
||||
strcat(prefix, TS_PATH_DELIMITER);
|
||||
int32_t prefixLen = (int32_t)strlen(prefix);
|
||||
|
@ -902,7 +907,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
|
|||
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
|
||||
if (pShow->pIter == NULL) break;
|
||||
|
||||
if (strncmp(pStb->name, prefix, prefixLen) != 0) {
|
||||
if (pStb->dbUid != pDb->uid) {
|
||||
sdbRelease(pSdb, pStb);
|
||||
continue;
|
||||
}
|
||||
|
@ -931,6 +936,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
|
|||
sdbRelease(pSdb, pStb);
|
||||
}
|
||||
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pShow->numOfReads += numOfRows;
|
||||
mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
|
||||
return numOfRows;
|
||||
|
|
|
@ -23,7 +23,7 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, tmsg_t type) {
|
|||
|
||||
tsize += taosEncodeFixedU64(buf, pReq->ver);
|
||||
switch (type) {
|
||||
case TDMT_MND_CREATE_TABLE:
|
||||
case TDMT_VND_CREATE_STB:
|
||||
tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq));
|
||||
break;
|
||||
case TDMT_VND_SUBMIT:
|
||||
|
@ -40,7 +40,7 @@ void *vnodeParseReq(void *buf, SVnodeReq *pReq, tmsg_t type) {
|
|||
buf = taosDecodeFixedU64(buf, &(pReq->ver));
|
||||
|
||||
switch (type) {
|
||||
case TDMT_MND_CREATE_TABLE:
|
||||
case TDMT_VND_CREATE_STB:
|
||||
buf = vnodeParseCreateTableReq(buf, &(pReq->ctReq));
|
||||
break;
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
// TODO: maybe need to clear the requst struct
|
||||
break;
|
||||
case TDMT_VND_DROP_STB:
|
||||
case TDMT_MND_DROP_TABLE:
|
||||
case TDMT_VND_DROP_TABLE:
|
||||
if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
|
|
|
@ -84,14 +84,14 @@ static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
|
|||
SVnodeReq vCreateSTbReq;
|
||||
vnodeSetCreateStbReq(&vCreateSTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
|
||||
|
||||
zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_MND_CREATE_TABLE);
|
||||
zs = vnodeBuildReq(NULL, &vCreateSTbReq, TDMT_VND_CREATE_STB);
|
||||
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
|
||||
pMsg->msgType = TDMT_MND_CREATE_TABLE;
|
||||
pMsg->msgType = TDMT_VND_CREATE_STB;
|
||||
pMsg->contLen = zs;
|
||||
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
|
||||
|
||||
pBuf = pMsg->pCont;
|
||||
vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_MND_CREATE_TABLE);
|
||||
vnodeBuildReq(&pBuf, &vCreateSTbReq, TDMT_VND_CREATE_STB);
|
||||
META_CLEAR_TB_CFG(&vCreateSTbReq);
|
||||
|
||||
tdFreeSchema(pSchema);
|
||||
|
@ -108,14 +108,14 @@ static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
|
|||
SVnodeReq vCreateCTbReq;
|
||||
vnodeSetCreateCtbReq(&vCreateCTbReq, tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
|
||||
|
||||
tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_MND_CREATE_TABLE);
|
||||
tz = vnodeBuildReq(NULL, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
|
||||
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
|
||||
pMsg->msgType = TDMT_MND_CREATE_TABLE;
|
||||
pMsg->msgType = TDMT_VND_CREATE_TABLE;
|
||||
pMsg->contLen = tz;
|
||||
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
|
||||
void *pBuf = pMsg->pCont;
|
||||
|
||||
vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_MND_CREATE_TABLE);
|
||||
vnodeBuildReq(&pBuf, &vCreateCTbReq, TDMT_VND_CREATE_TABLE);
|
||||
META_CLEAR_TB_CFG(&vCreateCTbReq);
|
||||
free(pTag);
|
||||
|
||||
|
|
|
@ -4495,7 +4495,7 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
|
|||
return code;
|
||||
}
|
||||
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE)? TDMT_MND_CREATE_TABLE:TDMT_MND_CREATE_STB;
|
||||
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE)? TDMT_VND_CREATE_TABLE:TDMT_MND_CREATE_STB;
|
||||
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
|
||||
// if ((code = doCheckForCreateFromStable(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
|
|
|
@ -1592,7 +1592,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
msgLen = (int32_t)(pMsg - (char*)pCreateTableMsg);
|
||||
pCreateTableMsg->contLen = htonl(msgLen);
|
||||
pCmd->payloadLen = msgLen;
|
||||
pCmd->msgType = TDMT_MND_CREATE_TABLE;
|
||||
pCmd->msgType = TDMT_VND_CREATE_TABLE;
|
||||
|
||||
assert(msgLen + minMsgSize() <= size);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue