diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h
index 8685a35312..5004c6571e 100644
--- a/source/dnode/mgmt/bnode/inc/bmInt.h
+++ b/source/dnode/mgmt/bnode/inc/bmInt.h
@@ -13,13 +13,14 @@
* along with this program. If not, see .
*/
-#ifndef _TD_DND_BNODE_H_
-#define _TD_DND_BNODE_H_
+#ifndef _TD_DND_BNODE_INT_H_
+#define _TD_DND_BNODE_INT_H_
+
+#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
-#include "dndInt.h"
SMgmtFp bmGetMgmtFp();
@@ -34,4 +35,4 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
}
#endif
-#endif /*_TD_DND_BNODE_H_*/
\ No newline at end of file
+#endif /*_TD_DND_BNODE_INT_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/bnode/inc/bmWorker.h b/source/dnode/mgmt/bnode/inc/bmWorker.h
index e69de29bb2..3a5c2e7169 100644
--- a/source/dnode/mgmt/bnode/inc/bmWorker.h
+++ b/source/dnode/mgmt/bnode/inc/bmWorker.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_DND_BNODE_WORKER_H_
+#define _TD_DND_BNODE_WORKER_H_
+
+#include "bmInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int32_t bmStartWorker(SDnode *pDnode);
+void bmStopWorker(SDnode *pDnode);
+void bmInitMsgFp(SMnodeMgmt *pMgmt);
+void bmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+int32_t bmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+int32_t bmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+void bmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+void bmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+
+void bmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void bmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void bmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_DND_BNODE_WORKER_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/dnode/inc/dndHandle.h b/source/dnode/mgmt/dnode/inc/dndHandle.h
new file mode 100644
index 0000000000..277c4f9f22
--- /dev/null
+++ b/source/dnode/mgmt/dnode/inc/dndHandle.h
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_DND_HADNLE_H_
+#define _TD_DND_HADNLE_H_
+
+#include "dndInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void dndInitMsgHandles(SMgmtWrapper *pWrapper);
+SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_DND_HADNLE_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/dnode/inc/dndInt.h b/source/dnode/mgmt/dnode/inc/dndInt.h
index ecea1f159f..1eb3ae556f 100644
--- a/source/dnode/mgmt/dnode/inc/dndInt.h
+++ b/source/dnode/mgmt/dnode/inc/dndInt.h
@@ -55,7 +55,7 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
-typedef enum { MNODE, NODE_MAX, VNODES, QNODE, SNODE, BNODE } ENodeType;
+typedef enum { DNODE, MNODE, NODE_MAX, VNODES, QNODE, SNODE, BNODE } ENodeType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
@@ -111,6 +111,9 @@ typedef struct {
SRWLatch latch;
SDnodeWorker mgmtWorker;
SDnodeWorker statusWorker;
+
+ //
+ SMsgHandle msgHandles[TDMT_MAX];
} SDnodeMgmt;
typedef struct {
@@ -140,6 +143,11 @@ typedef struct {
SRWLatch latch;
SDnodeWorker queryWorker;
SDnodeWorker fetchWorker;
+
+ //
+ SMsgHandle msgHandles[TDMT_MAX];
+ SProcObj *pProcess;
+ bool singleProc;
} SQnodeMgmt;
typedef struct {
@@ -149,6 +157,11 @@ typedef struct {
SSnode *pSnode;
SRWLatch latch;
SDnodeWorker writeWorker;
+
+ //
+ SMsgHandle msgHandles[TDMT_MAX];
+ SProcObj *pProcess;
+ bool singleProc;
} SSnodeMgmt;
typedef struct {
@@ -169,6 +182,11 @@ typedef struct {
SBnode *pBnode;
SRWLatch latch;
SDnodeWorker writeWorker;
+
+ //
+ SMsgHandle msgHandles[TDMT_MAX];
+ SProcObj *pProcess;
+ bool singleProc;
} SBnodeMgmt;
typedef struct {
@@ -179,6 +197,11 @@ typedef struct {
SFWorkerPool fetchPool;
SWWorkerPool syncPool;
SWWorkerPool writePool;
+
+ //
+ SMsgHandle msgHandles[TDMT_MAX];
+ SProcObj *pProcess;
+ bool singleProc;
} SVnodesMgmt;
typedef struct {
diff --git a/source/dnode/mgmt/dnode/inc/dndWorker.h b/source/dnode/mgmt/dnode/inc/dndWorker.h
index 22edb74c92..051ddaca26 100644
--- a/source/dnode/mgmt/dnode/inc/dndWorker.h
+++ b/source/dnode/mgmt/dnode/inc/dndWorker.h
@@ -27,6 +27,11 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, c
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
+
+
+void dndProcessMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+
+
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/mgmt/dnode/src/dndHandle.c b/source/dnode/mgmt/dnode/src/dndHandle.c
new file mode 100644
index 0000000000..cf4549615d
--- /dev/null
+++ b/source/dnode/mgmt/dnode/src/dndHandle.c
@@ -0,0 +1,57 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "dndHandle.h"
+#include "dndWorker.h"
+
+static void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
+ SDnodeMgmt *pMgmt = pWrapper->pMgmt;
+ SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
+
+ pHandle->pWrapper = pWrapper;
+ pHandle->nodeMsgFp = nodeMsgFp;
+ pHandle->rpcMsgFp = dndProcessRpcMsg;
+}
+
+void dndInitMsgHandles(SMgmtWrapper *pWrapper) {
+ // Requests handled by DNODE
+ dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dndProcessMgmtMsg);
+
+ // Requests handled by MNODE
+ dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dndProcessMgmtMsg);
+ dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dndProcessMgmtMsg);
+}
+
+SMsgHandle dndGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
+ SDnodeMgmt *pMgmt = pWrapper->pMgmt;
+ return pMgmt->msgHandles[msgIndex];
+}
diff --git a/source/dnode/mgmt/dnode/src/dndTransport.c b/source/dnode/mgmt/dnode/src/dndTransport.c
index 7bcdd04278..00971bd934 100644
--- a/source/dnode/mgmt/dnode/src/dndTransport.c
+++ b/source/dnode/mgmt/dnode/src/dndTransport.c
@@ -28,135 +28,6 @@
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
-#if 0
-static void dndInitMsgFp(STransMgmt *pMgmt) {
- // Requests handled by DNODE
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg;
-
- // Requests handled by MNODE
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = mmProcessRpcMsg;
-
- // Requests handled by VNODE
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH_RSP)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLES_META)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONSUME)] = dndProcessVnodeQueryMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_QUERY)] = dndProcessVnodeQueryMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_CONNECT)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_DISCONNECT)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_RES_READY)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TASKS_STATUS)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CANCEL_TASK)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TASK)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessRpcMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB)] = dndProcessVnodeWriteMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
- pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
-}
-
-#endif
-
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt;
diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h
index aa0d567643..34d081bf34 100644
--- a/source/dnode/mgmt/mnode/inc/mmInt.h
+++ b/source/dnode/mgmt/mnode/inc/mmInt.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef _TD_DND_MNODE_MGMT_H_
-#define _TD_DND_MNODE_MGMT_H_
+#ifndef _TD_DND_MNODE_INT_H_
+#define _TD_DND_MNODE_INT_H_
#include "dndInt.h"
@@ -41,4 +41,4 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro
}
#endif
-#endif /*_TD_DND_MNODE_MGMT_H_*/
\ No newline at end of file
+#endif /*_TD_DND_MNODE_INT_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h
index 48e5ea7dee..3f8f79afce 100644
--- a/source/dnode/mgmt/qnode/inc/qmInt.h
+++ b/source/dnode/mgmt/qnode/inc/qmInt.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef _TD_DND_QNODE_H_
-#define _TD_DND_QNODE_H_
+#ifndef _TD_DND_QNODE_INT_H_
+#define _TD_DND_QNODE_INT_H_
#ifdef __cplusplus
extern "C" {
@@ -35,4 +35,4 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
}
#endif
-#endif /*_TD_DND_QNODE_H_*/
\ No newline at end of file
+#endif /*_TD_DND_QNODE_INT_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/qnode/inc/qmWorker.h b/source/dnode/mgmt/qnode/inc/qmWorker.h
index e69de29bb2..b268ed7d1e 100644
--- a/source/dnode/mgmt/qnode/inc/qmWorker.h
+++ b/source/dnode/mgmt/qnode/inc/qmWorker.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_DND_QNODE_WORKER_H_
+#define _TD_DND_QNODE_WORKER_H_
+
+#include "qmInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int32_t qmStartWorker(SDnode *pDnode);
+void qmStopWorker(SDnode *pDnode);
+void qmInitMsgFp(SMnodeMgmt *pMgmt);
+void qmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+int32_t qmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+int32_t qmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+void qmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+void qmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+
+void qmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void qmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void qmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_DND_QNODE_WORKER_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h
index b01bf7640f..f570927f9e 100644
--- a/source/dnode/mgmt/snode/inc/smInt.h
+++ b/source/dnode/mgmt/snode/inc/smInt.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef _TD_DND_SNODE_H_
-#define _TD_DND_SNODE_H_
+#ifndef _TD_DND_SNODE_INT_H_
+#define _TD_DND_SNODE_INT_H_
#ifdef __cplusplus
extern "C" {
@@ -34,4 +34,4 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
}
#endif
-#endif /*_TD_DND_SNODE_H_*/
\ No newline at end of file
+#endif /*_TD_DND_SNODE_INT_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/snode/inc/smWorker.h b/source/dnode/mgmt/snode/inc/smWorker.h
index e69de29bb2..3ad593faed 100644
--- a/source/dnode/mgmt/snode/inc/smWorker.h
+++ b/source/dnode/mgmt/snode/inc/smWorker.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_DND_SNODE_WORKER_H_
+#define _TD_DND_SNODE_WORKER_H_
+
+#include "smInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int32_t smStartWorker(SDnode *pDnode);
+void smStopWorker(SDnode *pDnode);
+void smInitMsgFp(SMnodeMgmt *pMgmt);
+void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+void smConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+
+void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void smProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void smProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_DND_SNODE_WORKER_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/vnode/inc/vmHandle.h b/source/dnode/mgmt/vnode/inc/vmHandle.h
index e69de29bb2..0cb0bae87f 100644
--- a/source/dnode/mgmt/vnode/inc/vmHandle.h
+++ b/source/dnode/mgmt/vnode/inc/vmHandle.h
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_DND_VNODE_HANDLE_H_
+#define _TD_DND_VNODE_HANDLE_H_
+
+#include "vmInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+void vmInitMsgHandles(SMgmtWrapper *pWrapper);
+SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_DND_VNODE_HANDLE_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h
index 512f0fb2fa..106d519e31 100644
--- a/source/dnode/mgmt/vnode/inc/vmInt.h
+++ b/source/dnode/mgmt/vnode/inc/vmInt.h
@@ -13,17 +13,17 @@
* along with this program. If not, see .
*/
-#ifndef _TD_DND_VNODES_H_
-#define _TD_DND_VNODES_H_
+#ifndef _TD_DND_VNODE_INT_H_
+#define _TD_DND_VNODE_INT_H_
+
+#include "dndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
-#include "dndInt.h"
SMgmtFp vmGetMgmtFp() ;
-
int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads);
@@ -45,4 +45,4 @@ int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq);
}
#endif
-#endif /*_TD_DND_VNODES_H_*/
\ No newline at end of file
+#endif /*_TD_DND_VNODE_INT_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h
index e69de29bb2..50ca076e10 100644
--- a/source/dnode/mgmt/vnode/inc/vmWorker.h
+++ b/source/dnode/mgmt/vnode/inc/vmWorker.h
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_DND_VNODE_WORKER_H_
+#define _TD_DND_VNODE_WORKER_H_
+
+#include "vmInt.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+int32_t vmStartWorker(SDnode *pDnode);
+void vmStopWorker(SDnode *pDnode);
+void vmInitMsgFp(SMnodeMgmt *pMgmt);
+void vmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
+int32_t vmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
+void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
+
+void vmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void vmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void vmProcessQueryMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+void vmProcessFetchMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_DND_VNODE_WORKER_H_*/
\ No newline at end of file
diff --git a/source/dnode/mgmt/vnode/src/vmHandle.c b/source/dnode/mgmt/vnode/src/vmHandle.c
index e69de29bb2..10c9db8690 100644
--- a/source/dnode/mgmt/vnode/src/vmHandle.c
+++ b/source/dnode/mgmt/vnode/src/vmHandle.c
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "vmHandle.h"
+#include "vmWorker.h"
+
+static void vmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
+ SVnodesMgmt *pMgmt = pWrapper->pMgmt;
+ SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
+
+ pHandle->pWrapper = pWrapper;
+ pHandle->nodeMsgFp = nodeMsgFp;
+ pHandle->rpcMsgFp = dndProcessRpcMsg;
+}
+
+void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
+ // Requests handled by VNODE
+ vmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg);
+ vmSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg);
+}
+
+SMsgHandle vmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
+ SVnodesMgmt *pMgmt = pWrapper->pMgmt;
+ return pMgmt->msgHandles[msgIndex];
+}