diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c
index 7d6768b144..d835a3c0aa 100644
--- a/src/client/src/tscSql.c
+++ b/src/client/src/tscSql.c
@@ -77,7 +77,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
tscMgmtIpSet.inUse = 0;
tscMgmtIpSet.numOfIps = 1;
strcpy(tscMgmtIpSet.fqdn[0], ip);
- tscMgmtIpSet.port[0] = port? port: tsMnodeShellPort;
+ tscMgmtIpSet.port[0] = port? port: tsDnodeShellPort;
} else {
if (tsFirst[0] != 0) {
taosGetFqdnPortFromEp(tsFirst, tscMgmtIpSet.fqdn[tscMgmtIpSet.numOfIps], &tscMgmtIpSet.port[tscMgmtIpSet.numOfIps]);
@@ -100,7 +100,7 @@ STscObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
strncpy(pObj->user, user, TSDB_USER_LEN);
taosEncryptPass((uint8_t *)pass, strlen(pass), pObj->pass);
- pObj->mgmtPort = port ? port : tsMnodeShellPort;
+ pObj->mgmtPort = port ? port : tsDnodeShellPort;
if (db) {
int32_t len = strlen(db);
diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c
index aa3c836ba0..7ddeed4565 100644
--- a/src/client/src/tscSystem.c
+++ b/src/client/src/tscSystem.c
@@ -56,7 +56,7 @@ int32_t tscInitRpc(const char *user, const char *secret) {
if (pDnodeConn == NULL) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
- rpcInit.label = "TSC-vnode";
+ rpcInit.label = "TSC";
rpcInit.numOfThreads = tscNumOfThreads;
rpcInit.cfp = tscProcessMsgFromServer;
rpcInit.sessions = tsMaxVnodeConnections;
diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h
index dd0dd230fd..ef12ece393 100644
--- a/src/common/inc/tglobal.h
+++ b/src/common/inc/tglobal.h
@@ -55,10 +55,8 @@ extern char tsFirst[];
extern char tsSecond[];
extern char tsLocalEp[];
extern uint16_t tsServerPort;
-extern uint16_t tsMnodeDnodePort;
-extern uint16_t tsMnodeShellPort;
extern uint16_t tsDnodeShellPort;
-extern uint16_t tsDnodeMnodePort;
+extern uint16_t tsDnodeDnodePort;
extern uint16_t tsSyncPort;
extern int32_t tsStatusInterval;
diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c
index b482ca64f4..0af0710c85 100644
--- a/src/common/src/tglobal.c
+++ b/src/common/src/tglobal.c
@@ -66,11 +66,9 @@ char tsSecond[TSDB_FQDN_LEN] = {0};
char tsArbitrator[TSDB_FQDN_LEN] = {0};
char tsLocalEp[TSDB_FQDN_LEN] = {0}; // Local End Point, hostname:port
uint16_t tsServerPort = 6030;
-uint16_t tsMnodeShellPort = 6030; // udp[6030-6034] tcp[6030]
-uint16_t tsDnodeShellPort = 6035; // udp[6035-6039] tcp[6035]
-uint16_t tsMnodeDnodePort = 6040; // udp/tcp
-uint16_t tsDnodeMnodePort = 6045; // udp/tcp
-uint16_t tsSyncPort = 6050;
+uint16_t tsDnodeShellPort = 6030; // udp[6035-6039] tcp[6035]
+uint16_t tsDnodeDnodePort = 6035; // udp/tcp
+uint16_t tsSyncPort = 6040;
int32_t tsStatusInterval = 1; // second
int32_t tsShellActivityTimer = 3; // second
@@ -1245,8 +1243,7 @@ bool taosCheckGlobalCfg() {
tsVersion = 10 * tsVersion;
tsDnodeShellPort = tsServerPort + TSDB_PORT_DNODESHELL; // udp[6035-6039] tcp[6035]
- tsMnodeDnodePort = tsServerPort + TSDB_PORT_MNODEDNODE; // udp/tcp
- tsDnodeMnodePort = tsServerPort + TSDB_PORT_DNODEMNODE; // udp/tcp
+ tsDnodeDnodePort = tsServerPort + TSDB_PORT_DNODEDNODE; // udp/tcp
tsSyncPort = tsServerPort + TSDB_PORT_SYNC;
return true;
diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c
index 62b9a41494..e4f3142b89 100644
--- a/src/cq/src/cqMain.c
+++ b/src/cq/src/cqMain.c
@@ -26,10 +26,10 @@
#include "tcq.h"
#include "taos.h"
-#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
-#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
-#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
-#define cPrint(...) {taosPrintLog("WAL ", 255, __VA_ARGS__);}
+#define cError(...) if (cqDebugFlag & DEBUG_ERROR) {taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__);}
+#define cWarn(...) if (cqDebugFlag & DEBUG_WARN) {taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__);}
+#define cTrace(...) if (cqDebugFlag & DEBUG_TRACE) {taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__);}
+#define cPrint(...) {taosPrintLog("CQ ", 255, __VA_ARGS__);}
typedef struct {
int vgId;
diff --git a/src/dnode/inc/dnodeMClient.h b/src/dnode/inc/dnodeDnode.h
similarity index 80%
rename from src/dnode/inc/dnodeMClient.h
rename to src/dnode/inc/dnodeDnode.h
index 6d413ada88..2ce8d80c0f 100644
--- a/src/dnode/inc/dnodeMClient.h
+++ b/src/dnode/inc/dnodeDnode.h
@@ -13,16 +13,17 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_DNODE_MCLIENT_H
-#define TDENGINE_DNODE_MCLIENT_H
+#ifndef TDENGINE_DNODE_DNODE_H
+#define TDENGINE_DNODE_DNODE_H
#ifdef __cplusplus
extern "C" {
#endif
-int32_t dnodeInitMClient();
-void dnodeCleanupMClient();
-void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
+int32_t dnodeInitServer();
+void dnodeCleanupServer();
+int32_t dnodeInitClient();
+void dnodeCleanupClient();
#ifdef __cplusplus
}
diff --git a/src/dnode/inc/dnodeMnode.h b/src/dnode/inc/dnodeMnode.h
deleted file mode 100644
index 76a65a06c9..0000000000
--- a/src/dnode/inc/dnodeMnode.h
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef TDENGINE_DNODE_MNODE_H
-#define TDENGINE_DNODE_MNODE_H
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-int32_t dnodeInitMnode();
-void dnodeCleanupMnode();
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif
diff --git a/src/dnode/src/dnodeClient.c b/src/dnode/src/dnodeClient.c
new file mode 100644
index 0000000000..aa3ec0595f
--- /dev/null
+++ b/src/dnode/src/dnodeClient.c
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "os.h"
+#include "taosmsg.h"
+#include "trpc.h"
+#include "tutil.h"
+#include "tglobal.h"
+#include "dnode.h"
+#include "dnodeLog.h"
+#include "dnodeMgmt.h"
+
+static void *tsDnodeClientRpc;
+static void (*dnodeProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
+static void dnodeProcessRspFromDnode(SRpcMsg *pMsg);
+extern void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet);
+
+int32_t dnodeInitClient() {
+ SRpcInit rpcInit;
+ memset(&rpcInit, 0, sizeof(rpcInit));
+ rpcInit.label = "DND-C";
+ rpcInit.numOfThreads = 1;
+ rpcInit.cfp = dnodeProcessRspFromDnode;
+ rpcInit.ufp = dnodeUpdateIpSet;
+ rpcInit.sessions = 100;
+ rpcInit.connType = TAOS_CONN_CLIENT;
+ rpcInit.idleTime = tsShellActivityTimer * 2000;
+ rpcInit.user = "t";
+ rpcInit.ckey = "key";
+ rpcInit.secret = "secret";
+
+ tsDnodeClientRpc = rpcOpen(&rpcInit);
+ if (tsDnodeClientRpc == NULL) {
+ dError("failed to init mnode rpc client");
+ return -1;
+ }
+
+ dPrint("inter-dndoes rpc client is opened");
+ return 0;
+}
+
+void dnodeCleanupClient() {
+ if (tsDnodeClientRpc) {
+ rpcClose(tsDnodeClientRpc);
+ tsDnodeClientRpc = NULL;
+ dPrint("inter-dnodes rpc client is closed");
+ }
+}
+
+static void dnodeProcessRspFromDnode(SRpcMsg *pMsg) {
+ if (dnodeProcessDnodeRspFp[pMsg->msgType]) {
+ (*dnodeProcessDnodeRspFp[pMsg->msgType])(pMsg);
+ } else {
+ dError("%s is not processed", taosMsg[pMsg->msgType]);
+ }
+ rpcFreeCont(pMsg->pCont);
+}
+
+void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
+ dnodeProcessDnodeRspFp[msgType] = fp;
+}
+
+void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
+ rpcSendRequest(tsDnodeClientRpc, ipSet, rpcMsg);
+}
diff --git a/src/dnode/src/dnodeMClient.c b/src/dnode/src/dnodeMClient.c
deleted file mode 100644
index 3aa863799b..0000000000
--- a/src/dnode/src/dnodeMClient.c
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "cJSON.h"
-#include "taosmsg.h"
-#include "trpc.h"
-#include "tutil.h"
-#include "tsync.h"
-#include "ttime.h"
-#include "ttimer.h"
-#include "tbalance.h"
-#include "tglobal.h"
-#include "vnode.h"
-#include "mnode.h"
-#include "dnode.h"
-#include "dnodeLog.h"
-#include "dnodeMClient.h"
-#include "dnodeModule.h"
-#include "dnodeMgmt.h"
-
-#define MPEER_CONTENT_LEN 2000
-
-static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
-static bool dnodeReadMnodeInfos();
-static void dnodeSaveMnodeInfos();
-static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
-static bool dnodeReadDnodeCfg();
-static void dnodeSaveDnodeCfg();
-static void dnodeProcessRspFromMnode(SRpcMsg *pMsg);
-static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
-static void dnodeSendStatusMsg(void *handle, void *tmrId);
-static void (*tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
-
-static void *tsDnodeMClientRpc = NULL;
-static void *tsDnodeTmr = NULL;
-static void *tsStatusTimer = NULL;
-static uint32_t tsRebootTime;
-
-static SRpcIpSet tsMnodeIpSet = {0};
-static SDMMnodeInfos tsMnodeInfos = {0};
-static SDMDnodeCfg tsDnodeCfg = {0};
-
-void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
- dTrace("mgmt IP list is changed for ufp is called");
- tsMnodeIpSet = *pIpSet;
-}
-
-void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) {
- SRpcIpSet *ipSet = ipSetRaw;
- ipSet->numOfIps = tsMnodeInfos.nodeNum;
- ipSet->inUse = tsMnodeInfos.inUse;
- for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) {
- taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]);
- ipSet->port[i] += TSDB_PORT_MNODEDNODE;
- }
-}
-
-int32_t dnodeInitMClient() {
- dnodeReadDnodeCfg();
- tsRebootTime = taosGetTimestampSec();
-
- tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
- if (tsDnodeTmr == NULL) {
- dError("failed to init dnode timer");
- return -1;
- }
-
- if (!dnodeReadMnodeInfos()) {
- memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet));
- memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
- tsMnodeIpSet.numOfIps = 1;
- taosGetFqdnPortFromEp(tsFirst, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]);
- tsMnodeIpSet.port[0] += TSDB_PORT_MNODEDNODE;
- if (strcmp(tsSecond, tsFirst) != 0) {
- tsMnodeIpSet.numOfIps = 2;
- taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]);
- tsMnodeIpSet.port[1] += TSDB_PORT_MNODEDNODE;
- }
- } else {
- tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
- tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
- for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
- taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
- tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE;
- }
- }
-
- SRpcInit rpcInit;
- memset(&rpcInit, 0, sizeof(rpcInit));
- rpcInit.label = "DND-MC";
- rpcInit.numOfThreads = 1;
- rpcInit.cfp = dnodeProcessRspFromMnode;
- rpcInit.ufp = dnodeUpdateIpSet;
- rpcInit.sessions = 100;
- rpcInit.connType = TAOS_CONN_CLIENT;
- rpcInit.idleTime = tsShellActivityTimer * 2000;
- rpcInit.user = "t";
- rpcInit.ckey = "key";
- rpcInit.secret = "secret";
-
- tsDnodeMClientRpc = rpcOpen(&rpcInit);
- if (tsDnodeMClientRpc == NULL) {
- dError("failed to init mnode rpc client");
- return -1;
- }
-
- tsDnodeProcessMgmtRspFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
- taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
-
- dPrint("mnode rpc client is opened");
- return 0;
-}
-
-void dnodeCleanupMClient() {
- if (tsStatusTimer != NULL) {
- taosTmrStopA(&tsStatusTimer);
- tsStatusTimer = NULL;
- }
-
- if (tsDnodeTmr != NULL) {
- taosTmrCleanUp(tsDnodeTmr);
- tsDnodeTmr = NULL;
- }
-
- if (tsDnodeMClientRpc) {
- rpcClose(tsDnodeMClientRpc);
- tsDnodeMClientRpc = NULL;
- dPrint("mnode rpc client is closed");
- }
-}
-
-static void dnodeProcessRspFromMnode(SRpcMsg *pMsg) {
- if (tsDnodeProcessMgmtRspFp[pMsg->msgType]) {
- (*tsDnodeProcessMgmtRspFp[pMsg->msgType])(pMsg);
- } else {
- dError("%s is not processed in dnode mclient", taosMsg[pMsg->msgType]);
- SRpcMsg rpcRsp = {.pCont = 0, .contLen = 0, .code = TSDB_CODE_OPS_NOT_SUPPORT, .handle = pMsg->handle};
- rpcSendResponse(&rpcRsp);
- }
-
- rpcFreeCont(pMsg->pCont);
-}
-
-static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
- if (pMsg->code != TSDB_CODE_SUCCESS) {
- dError("status rsp is received, error:%s", tstrerror(pMsg->code));
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- return;
- }
-
- SDMStatusRsp *pStatusRsp = pMsg->pCont;
- SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
- if (pMnodes->nodeNum <= 0) {
- dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- return;
- }
-
- SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
- pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
- pCfg->moduleStatus = htonl(pCfg->moduleStatus);
- pCfg->dnodeId = htonl(pCfg->dnodeId);
-
- for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
- SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
- pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
- }
-
- SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
- for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) {
- pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId);
- }
-
- dnodeProcessModuleStatus(pCfg->moduleStatus);
- dnodeUpdateDnodeCfg(pCfg);
- dnodeUpdateMnodeInfos(pMnodes);
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
-}
-
-static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
- bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
- bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0);
- if (!(mnodesChanged || mnodesNotInit)) return;
-
- memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
-
- tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
- tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
- for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
- taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
- tsMnodeIpSet.port[i] += TSDB_PORT_MNODEDNODE;
- }
-
- dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
- for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
- dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
- }
-
- dnodeSaveMnodeInfos();
- sdbUpdateSync();
-}
-
-void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
- if (tsDnodeMClientRpc) {
- rpcSendRequest(tsDnodeMClientRpc, &tsMnodeIpSet, rpcMsg);
- }
-}
-
-static bool dnodeReadMnodeInfos() {
- char ipFile[TSDB_FILENAME_LEN] = {0};
- sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
- FILE *fp = fopen(ipFile, "r");
- if (!fp) {
- dTrace("failed to read mnode mgmtIpList.json, file not exist");
- return false;
- }
-
- bool ret = false;
- int maxLen = 2000;
- char *content = calloc(1, maxLen + 1);
- int len = fread(content, 1, maxLen, fp);
- if (len <= 0) {
- free(content);
- fclose(fp);
- dError("failed to read mnode mgmtIpList.json, content is null");
- return false;
- }
-
- cJSON* root = cJSON_Parse(content);
- if (root == NULL) {
- dError("failed to read mnode mgmtIpList.json, invalid json format");
- goto PARSE_OVER;
- }
-
- cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
- if (!inUse || inUse->type != cJSON_Number) {
- dError("failed to read mnode mgmtIpList.json, inUse not found");
- goto PARSE_OVER;
- }
- tsMnodeInfos.inUse = inUse->valueint;
-
- cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
- if (!nodeNum || nodeNum->type != cJSON_Number) {
- dError("failed to read mnode mgmtIpList.json, nodeNum not found");
- goto PARSE_OVER;
- }
- tsMnodeInfos.nodeNum = nodeNum->valueint;
-
- cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
- if (!nodeInfos || nodeInfos->type != cJSON_Array) {
- dError("failed to read mnode mgmtIpList.json, nodeInfos not found");
- goto PARSE_OVER;
- }
-
- int size = cJSON_GetArraySize(nodeInfos);
- if (size != tsMnodeInfos.nodeNum) {
- dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched");
- goto PARSE_OVER;
- }
-
- for (int i = 0; i < size; ++i) {
- cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
- if (nodeInfo == NULL) continue;
-
- cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
- if (!nodeId || nodeId->type != cJSON_Number) {
- dError("failed to read mnode mgmtIpList.json, nodeId not found");
- goto PARSE_OVER;
- }
- tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
-
- cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
- if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
- dError("failed to read mnode mgmtIpList.json, nodeName not found");
- goto PARSE_OVER;
- }
- strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN);
- }
-
- ret = true;
-
- dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
- for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
- dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
- }
-
-PARSE_OVER:
- free(content);
- cJSON_Delete(root);
- fclose(fp);
- return ret;
-}
-
-static void dnodeSaveMnodeInfos() {
- char ipFile[TSDB_FILENAME_LEN] = {0};
- sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
- FILE *fp = fopen(ipFile, "w");
- if (!fp) return;
-
- int32_t len = 0;
- int32_t maxLen = 2000;
- char * content = calloc(1, maxLen + 1);
-
- len += snprintf(content + len, maxLen - len, "{\n");
- len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMnodeInfos.inUse);
- len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMnodeInfos.nodeNum);
- len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
- for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
- len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId);
- len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp);
- if (i < tsMnodeInfos.nodeNum -1) {
- len += snprintf(content + len, maxLen - len, " },{\n");
- } else {
- len += snprintf(content + len, maxLen - len, " }]\n");
- }
- }
- len += snprintf(content + len, maxLen - len, "}\n");
-
- fwrite(content, 1, len, fp);
- fclose(fp);
- free(content);
-
- dPrint("save mnode iplist successed");
-}
-
-char *dnodeGetMnodeMasterEp() {
- return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp;
-}
-
-void* dnodeGetMnodeInfos() {
- return &tsMnodeInfos;
-}
-
-static void dnodeSendStatusMsg(void *handle, void *tmrId) {
- if (tsDnodeTmr == NULL) {
- dError("dnode timer is already released");
- return;
- }
-
- if (tsStatusTimer == NULL) {
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- dError("failed to start status timer");
- return;
- }
-
- int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
- SDMStatusMsg *pStatus = rpcMallocCont(contLen);
- if (pStatus == NULL) {
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
- dError("failed to malloc status message");
- return;
- }
-
- //strcpy(pStatus->dnodeName, tsDnodeName);
- pStatus->version = htonl(tsVersion);
- pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
- strcpy(pStatus->dnodeEp, tsLocalEp);
- pStatus->lastReboot = htonl(tsRebootTime);
- pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
- pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
- pStatus->diskAvailable = tsAvailDataDirGB;
- pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
-
- vnodeBuildStatusMsg(pStatus);
- contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
- pStatus->openVnodes = htons(pStatus->openVnodes);
-
- SRpcMsg rpcMsg = {
- .pCont = pStatus,
- .contLen = contLen,
- .msgType = TSDB_MSG_TYPE_DM_STATUS
- };
-
- dnodeSendMsgToMnode(&rpcMsg);
-}
-
-static bool dnodeReadDnodeCfg() {
- char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
- sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
-
- FILE *fp = fopen(dnodeCfgFile, "r");
- if (!fp) {
- dTrace("failed to read dnodeCfg.json, file not exist");
- return false;
- }
-
- bool ret = false;
- int maxLen = 100;
- char *content = calloc(1, maxLen + 1);
- int len = fread(content, 1, maxLen, fp);
- if (len <= 0) {
- free(content);
- fclose(fp);
- dError("failed to read dnodeCfg.json, content is null");
- return false;
- }
-
- cJSON* root = cJSON_Parse(content);
- if (root == NULL) {
- dError("failed to read dnodeCfg.json, invalid json format");
- goto PARSE_CFG_OVER;
- }
-
- cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
- if (!dnodeId || dnodeId->type != cJSON_Number) {
- dError("failed to read dnodeCfg.json, dnodeId not found");
- goto PARSE_CFG_OVER;
- }
- tsDnodeCfg.dnodeId = dnodeId->valueint;
-
- ret = true;
-
- dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
-
-PARSE_CFG_OVER:
- free(content);
- cJSON_Delete(root);
- fclose(fp);
- return ret;
-}
-
-static void dnodeSaveDnodeCfg() {
- char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
- sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
-
- FILE *fp = fopen(dnodeCfgFile, "w");
- if (!fp) return;
-
- int32_t len = 0;
- int32_t maxLen = 100;
- char * content = calloc(1, maxLen + 1);
-
- len += snprintf(content + len, maxLen - len, "{\n");
- len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId);
- len += snprintf(content + len, maxLen - len, "}\n");
-
- fwrite(content, 1, len, fp);
- fclose(fp);
- free(content);
-
- dPrint("save dnodeId successed");
-}
-
-void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
- if (tsDnodeCfg.dnodeId == 0) {
- dPrint("dnodeId is set to %d", pCfg->dnodeId);
- tsDnodeCfg.dnodeId = pCfg->dnodeId;
- dnodeSaveDnodeCfg();
- }
-}
-
-int32_t dnodeGetDnodeId() {
- return tsDnodeCfg.dnodeId;
-}
diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c
index 940b884927..f6bd026703 100644
--- a/src/dnode/src/dnodeMain.c
+++ b/src/dnode/src/dnodeMain.c
@@ -23,9 +23,8 @@
#include "tglobal.h"
#include "dnode.h"
#include "dnodeLog.h"
-#include "dnodeMClient.h"
#include "dnodeMgmt.h"
-#include "dnodeMnode.h"
+#include "dnodeDnode.h"
#include "dnodeModule.h"
#include "dnodeRead.h"
#include "dnodeShell.h"
@@ -167,9 +166,9 @@ static int32_t dnodeInitSystem() {
if (dnodeInitStorage() != 0) return -1;
if (dnodeInitRead() != 0) return -1;
if (dnodeInitWrite() != 0) return -1;
- if (dnodeInitMClient() != 0) return -1;
+ if (dnodeInitClient() != 0) return -1;
if (dnodeInitModules() != 0) return -1;
- if (dnodeInitMnode() != 0) return -1;
+ if (dnodeInitServer() != 0) return -1;
if (dnodeInitMgmt() != 0) return -1;
if (dnodeInitShell() != 0) return -1;
@@ -185,9 +184,9 @@ static void dnodeCleanUpSystem() {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
dnodeCleanupShell();
- dnodeCleanupMnode();
+ dnodeCleanupServer();
dnodeCleanupMgmt();
- dnodeCleanupMClient();
+ dnodeCleanupClient();
dnodeCleanupWrite();
dnodeCleanupRead();
dnodeCleanUpModules();
diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c
index fbf1ceea71..db25cfd23b 100644
--- a/src/dnode/src/dnodeMgmt.c
+++ b/src/dnode/src/dnodeMgmt.c
@@ -15,19 +15,47 @@
#define _DEFAULT_SOURCE
#include "os.h"
+#include "cJSON.h"
#include "ihash.h"
#include "taoserror.h"
#include "taosmsg.h"
+#include "ttime.h"
+#include "ttimer.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
-#include "vnode.h"
+#include "tsync.h"
+#include "ttime.h"
+#include "ttimer.h"
+#include "tbalance.h"
#include "tglobal.h"
+#include "dnode.h"
+#include "vnode.h"
+#include "mnode.h"
#include "dnodeLog.h"
-#include "dnodeMClient.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
+#include "dnodeModule.h"
+
+#define MPEER_CONTENT_LEN 2000
+
+static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes);
+static bool dnodeReadMnodeInfos();
+static void dnodeSaveMnodeInfos();
+static void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg);
+static bool dnodeReadDnodeCfg();
+static void dnodeSaveDnodeCfg();
+static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
+static void dnodeSendStatusMsg(void *handle, void *tmrId);
+
+static void *tsDnodeTmr = NULL;
+static void *tsStatusTimer = NULL;
+static uint32_t tsRebootTime;
+
+static SRpcIpSet tsMnodeIpSet = {0};
+static SDMMnodeInfos tsMnodeInfos = {0};
+static SDMDnodeCfg tsDnodeCfg = {0};
static int32_t dnodeOpenVnodes();
static void dnodeCloseVnodes();
@@ -43,15 +71,59 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
+ dnodeReadDnodeCfg();
+ tsRebootTime = taosGetTimestampSec();
+
+ tsDnodeTmr = taosTmrInit(100, 200, 60000, "DND-DM");
+ if (tsDnodeTmr == NULL) {
+ dError("failed to init dnode timer");
+ return -1;
+ }
+
+ if (!dnodeReadMnodeInfos()) {
+ memset(&tsMnodeIpSet, 0, sizeof(SRpcIpSet));
+ memset(&tsMnodeInfos, 0, sizeof(SDMMnodeInfos));
+ tsMnodeIpSet.numOfIps = 1;
+ taosGetFqdnPortFromEp(tsFirst, tsMnodeIpSet.fqdn[0], &tsMnodeIpSet.port[0]);
+ tsMnodeIpSet.port[0] += TSDB_PORT_DNODEDNODE;
+ if (strcmp(tsSecond, tsFirst) != 0) {
+ tsMnodeIpSet.numOfIps = 2;
+ taosGetFqdnPortFromEp(tsSecond, tsMnodeIpSet.fqdn[1], &tsMnodeIpSet.port[1]);
+ tsMnodeIpSet.port[1] += TSDB_PORT_DNODEDNODE;
+ }
+ } else {
+ tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
+ tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
+ for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
+ taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
+ tsMnodeIpSet.port[i] += TSDB_PORT_DNODEDNODE;
+ }
+ }
+
int32_t code = dnodeOpenVnodes();
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
+ taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsDnodeTmr, &tsStatusTimer);
+
+ dPrint("dnode mgmt is initialized");
+
return TSDB_CODE_SUCCESS;
}
void dnodeCleanupMgmt() {
+ if (tsStatusTimer != NULL) {
+ taosTmrStopA(&tsStatusTimer);
+ tsStatusTimer = NULL;
+ }
+
+ if (tsDnodeTmr != NULL) {
+ taosTmrCleanUp(tsDnodeTmr);
+ tsDnodeTmr = NULL;
+ }
+
dnodeCloseVnodes();
}
@@ -193,3 +265,326 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
return taosCfgDynamicOptions(pCfg->config);
}
+
+
+void dnodeUpdateIpSet(void *ahandle, SRpcIpSet *pIpSet) {
+ dTrace("mgmt IP list is changed for ufp is called");
+ tsMnodeIpSet = *pIpSet;
+}
+
+void dnodeGetMnodeDnodeIpSet(void *ipSetRaw) {
+ SRpcIpSet *ipSet = ipSetRaw;
+ ipSet->numOfIps = tsMnodeInfos.nodeNum;
+ ipSet->inUse = tsMnodeInfos.inUse;
+ for (int32_t i = 0; i < tsMnodeInfos.nodeNum; ++i) {
+ taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, ipSet->fqdn[i], &ipSet->port[i]);
+ ipSet->port[i] += TSDB_PORT_DNODEDNODE;
+ }
+}
+
+static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
+ if (pMsg->code != TSDB_CODE_SUCCESS) {
+ dError("status rsp is received, error:%s", tstrerror(pMsg->code));
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ return;
+ }
+
+ SDMStatusRsp *pStatusRsp = pMsg->pCont;
+ SDMMnodeInfos *pMnodes = &pStatusRsp->mnodes;
+ if (pMnodes->nodeNum <= 0) {
+ dError("status msg is invalid, num of ips is %d", pMnodes->nodeNum);
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ return;
+ }
+
+ SDMDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
+ pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
+ pCfg->moduleStatus = htonl(pCfg->moduleStatus);
+ pCfg->dnodeId = htonl(pCfg->dnodeId);
+
+ for (int32_t i = 0; i < pMnodes->nodeNum; ++i) {
+ SDMMnodeInfo *pMnodeInfo = &pMnodes->nodeInfos[i];
+ pMnodeInfo->nodeId = htonl(pMnodeInfo->nodeId);
+ }
+
+ SDMVgroupAccess *pVgAcccess = pStatusRsp->vgAccess;
+ for (int32_t i = 0; i < pCfg->numOfVnodes; ++i) {
+ pVgAcccess[i].vgId = htonl(pVgAcccess[i].vgId);
+ }
+
+ dnodeProcessModuleStatus(pCfg->moduleStatus);
+ dnodeUpdateDnodeCfg(pCfg);
+ dnodeUpdateMnodeInfos(pMnodes);
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+}
+
+static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
+ bool mnodesChanged = (memcmp(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos)) != 0);
+ bool mnodesNotInit = (tsMnodeInfos.nodeNum == 0);
+ if (!(mnodesChanged || mnodesNotInit)) return;
+
+ memcpy(&tsMnodeInfos, pMnodes, sizeof(SDMMnodeInfos));
+
+ tsMnodeIpSet.inUse = tsMnodeInfos.inUse;
+ tsMnodeIpSet.numOfIps = tsMnodeInfos.nodeNum;
+ for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
+ taosGetFqdnPortFromEp(tsMnodeInfos.nodeInfos[i].nodeEp, tsMnodeIpSet.fqdn[i], &tsMnodeIpSet.port[i]);
+ tsMnodeIpSet.port[i] += TSDB_PORT_DNODEDNODE;
+ }
+
+ dPrint("mnodes is changed, nodeNum:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
+ for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
+ dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
+ }
+
+ dnodeSaveMnodeInfos();
+ sdbUpdateSync();
+}
+
+static bool dnodeReadMnodeInfos() {
+ char ipFile[TSDB_FILENAME_LEN] = {0};
+ sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
+ FILE *fp = fopen(ipFile, "r");
+ if (!fp) {
+ dTrace("failed to read mnode mgmtIpList.json, file not exist");
+ return false;
+ }
+
+ bool ret = false;
+ int maxLen = 2000;
+ char *content = calloc(1, maxLen + 1);
+ int len = fread(content, 1, maxLen, fp);
+ if (len <= 0) {
+ free(content);
+ fclose(fp);
+ dError("failed to read mnode mgmtIpList.json, content is null");
+ return false;
+ }
+
+ cJSON* root = cJSON_Parse(content);
+ if (root == NULL) {
+ dError("failed to read mnode mgmtIpList.json, invalid json format");
+ goto PARSE_OVER;
+ }
+
+ cJSON* inUse = cJSON_GetObjectItem(root, "inUse");
+ if (!inUse || inUse->type != cJSON_Number) {
+ dError("failed to read mnode mgmtIpList.json, inUse not found");
+ goto PARSE_OVER;
+ }
+ tsMnodeInfos.inUse = inUse->valueint;
+
+ cJSON* nodeNum = cJSON_GetObjectItem(root, "nodeNum");
+ if (!nodeNum || nodeNum->type != cJSON_Number) {
+ dError("failed to read mnode mgmtIpList.json, nodeNum not found");
+ goto PARSE_OVER;
+ }
+ tsMnodeInfos.nodeNum = nodeNum->valueint;
+
+ cJSON* nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
+ if (!nodeInfos || nodeInfos->type != cJSON_Array) {
+ dError("failed to read mnode mgmtIpList.json, nodeInfos not found");
+ goto PARSE_OVER;
+ }
+
+ int size = cJSON_GetArraySize(nodeInfos);
+ if (size != tsMnodeInfos.nodeNum) {
+ dError("failed to read mnode mgmtIpList.json, nodeInfos size not matched");
+ goto PARSE_OVER;
+ }
+
+ for (int i = 0; i < size; ++i) {
+ cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
+ if (nodeInfo == NULL) continue;
+
+ cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
+ if (!nodeId || nodeId->type != cJSON_Number) {
+ dError("failed to read mnode mgmtIpList.json, nodeId not found");
+ goto PARSE_OVER;
+ }
+ tsMnodeInfos.nodeInfos[i].nodeId = nodeId->valueint;
+
+ cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
+ if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
+ dError("failed to read mnode mgmtIpList.json, nodeName not found");
+ goto PARSE_OVER;
+ }
+ strncpy(tsMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_FQDN_LEN);
+ }
+
+ ret = true;
+
+ dPrint("read mnode iplist successed, numOfIps:%d inUse:%d", tsMnodeInfos.nodeNum, tsMnodeInfos.inUse);
+ for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
+ dPrint("mnode:%d, %s", tsMnodeInfos.nodeInfos[i].nodeId, tsMnodeInfos.nodeInfos[i].nodeEp);
+ }
+
+PARSE_OVER:
+ free(content);
+ cJSON_Delete(root);
+ fclose(fp);
+ return ret;
+}
+
+static void dnodeSaveMnodeInfos() {
+ char ipFile[TSDB_FILENAME_LEN] = {0};
+ sprintf(ipFile, "%s/mgmtIpList.json", tsDnodeDir);
+ FILE *fp = fopen(ipFile, "w");
+ if (!fp) return;
+
+ int32_t len = 0;
+ int32_t maxLen = 2000;
+ char * content = calloc(1, maxLen + 1);
+
+ len += snprintf(content + len, maxLen - len, "{\n");
+ len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsMnodeInfos.inUse);
+ len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsMnodeInfos.nodeNum);
+ len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
+ for (int32_t i = 0; i < tsMnodeInfos.nodeNum; i++) {
+ len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsMnodeInfos.nodeInfos[i].nodeId);
+ len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsMnodeInfos.nodeInfos[i].nodeEp);
+ if (i < tsMnodeInfos.nodeNum -1) {
+ len += snprintf(content + len, maxLen - len, " },{\n");
+ } else {
+ len += snprintf(content + len, maxLen - len, " }]\n");
+ }
+ }
+ len += snprintf(content + len, maxLen - len, "}\n");
+
+ fwrite(content, 1, len, fp);
+ fclose(fp);
+ free(content);
+
+ dPrint("save mnode iplist successed");
+}
+
+char *dnodeGetMnodeMasterEp() {
+ return tsMnodeInfos.nodeInfos[tsMnodeIpSet.inUse].nodeEp;
+}
+
+void* dnodeGetMnodeInfos() {
+ return &tsMnodeInfos;
+}
+
+static void dnodeSendStatusMsg(void *handle, void *tmrId) {
+ if (tsDnodeTmr == NULL) {
+ dError("dnode timer is already released");
+ return;
+ }
+
+ if (tsStatusTimer == NULL) {
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ dError("failed to start status timer");
+ return;
+ }
+
+ int32_t contLen = sizeof(SDMStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
+ SDMStatusMsg *pStatus = rpcMallocCont(contLen);
+ if (pStatus == NULL) {
+ taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsDnodeTmr, &tsStatusTimer);
+ dError("failed to malloc status message");
+ return;
+ }
+
+ //strcpy(pStatus->dnodeName, tsDnodeName);
+ pStatus->version = htonl(tsVersion);
+ pStatus->dnodeId = htonl(tsDnodeCfg.dnodeId);
+ strcpy(pStatus->dnodeEp, tsLocalEp);
+ pStatus->lastReboot = htonl(tsRebootTime);
+ pStatus->numOfTotalVnodes = htons((uint16_t) tsNumOfTotalVnodes);
+ pStatus->numOfCores = htons((uint16_t) tsNumOfCores);
+ pStatus->diskAvailable = tsAvailDataDirGB;
+ pStatus->alternativeRole = (uint8_t) tsAlternativeRole;
+
+ vnodeBuildStatusMsg(pStatus);
+ contLen = sizeof(SDMStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
+ pStatus->openVnodes = htons(pStatus->openVnodes);
+
+ SRpcMsg rpcMsg = {
+ .pCont = pStatus,
+ .contLen = contLen,
+ .msgType = TSDB_MSG_TYPE_DM_STATUS
+ };
+
+ dnodeSendMsgToDnode(&tsMnodeIpSet, &rpcMsg);
+}
+
+static bool dnodeReadDnodeCfg() {
+ char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
+ sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
+
+ FILE *fp = fopen(dnodeCfgFile, "r");
+ if (!fp) {
+ dTrace("failed to read dnodeCfg.json, file not exist");
+ return false;
+ }
+
+ bool ret = false;
+ int maxLen = 100;
+ char *content = calloc(1, maxLen + 1);
+ int len = fread(content, 1, maxLen, fp);
+ if (len <= 0) {
+ free(content);
+ fclose(fp);
+ dError("failed to read dnodeCfg.json, content is null");
+ return false;
+ }
+
+ cJSON* root = cJSON_Parse(content);
+ if (root == NULL) {
+ dError("failed to read dnodeCfg.json, invalid json format");
+ goto PARSE_CFG_OVER;
+ }
+
+ cJSON* dnodeId = cJSON_GetObjectItem(root, "dnodeId");
+ if (!dnodeId || dnodeId->type != cJSON_Number) {
+ dError("failed to read dnodeCfg.json, dnodeId not found");
+ goto PARSE_CFG_OVER;
+ }
+ tsDnodeCfg.dnodeId = dnodeId->valueint;
+
+ ret = true;
+
+ dPrint("read numOfVnodes successed, dnodeId:%d", tsDnodeCfg.dnodeId);
+
+PARSE_CFG_OVER:
+ free(content);
+ cJSON_Delete(root);
+ fclose(fp);
+ return ret;
+}
+
+static void dnodeSaveDnodeCfg() {
+ char dnodeCfgFile[TSDB_FILENAME_LEN] = {0};
+ sprintf(dnodeCfgFile, "%s/dnodeCfg.json", tsDnodeDir);
+
+ FILE *fp = fopen(dnodeCfgFile, "w");
+ if (!fp) return;
+
+ int32_t len = 0;
+ int32_t maxLen = 100;
+ char * content = calloc(1, maxLen + 1);
+
+ len += snprintf(content + len, maxLen - len, "{\n");
+ len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d\n", tsDnodeCfg.dnodeId);
+ len += snprintf(content + len, maxLen - len, "}\n");
+
+ fwrite(content, 1, len, fp);
+ fclose(fp);
+ free(content);
+
+ dPrint("save dnodeId successed");
+}
+
+void dnodeUpdateDnodeCfg(SDMDnodeCfg *pCfg) {
+ if (tsDnodeCfg.dnodeId == 0) {
+ dPrint("dnodeId is set to %d", pCfg->dnodeId);
+ tsDnodeCfg.dnodeId = pCfg->dnodeId;
+ dnodeSaveDnodeCfg();
+ }
+}
+
+int32_t dnodeGetDnodeId() {
+ return tsDnodeCfg.dnodeId;
+}
+
diff --git a/src/dnode/src/dnodeMnode.c b/src/dnode/src/dnodeServer.c
similarity index 73%
rename from src/dnode/src/dnodeMnode.c
rename to src/dnode/src/dnodeServer.c
index 75c09d43ba..169cd6cffa 100644
--- a/src/dnode/src/dnodeMnode.c
+++ b/src/dnode/src/dnodeServer.c
@@ -23,10 +23,10 @@
#include "dnodeWrite.h"
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *);
-static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg);
-static void *tsDnodeMnodeRpc = NULL;
+static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg);
+static void *tsDnodeServerRpc = NULL;
-int32_t dnodeInitMnode() {
+int32_t dnodeInitServer() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeWrite;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeWrite;
@@ -38,33 +38,35 @@ int32_t dnodeInitMnode() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
- rpcInit.localPort = tsDnodeMnodePort;
- rpcInit.label = "DND-MS";
+ rpcInit.localPort = tsDnodeDnodePort;
+ rpcInit.label = "DND-S";
rpcInit.numOfThreads = 1;
- rpcInit.cfp = dnodeProcessMsgFromMnode;
+ rpcInit.cfp = dnodeProcessReqMsgFromDnode;
rpcInit.sessions = 100;
rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 2000;
- tsDnodeMnodeRpc = rpcOpen(&rpcInit);
- if (tsDnodeMnodeRpc == NULL) {
- dError("failed to init mnode rpc server");
+ tsDnodeServerRpc = rpcOpen(&rpcInit);
+ if (tsDnodeServerRpc == NULL) {
+ dError("failed to init inter-dnodes RPC server");
return -1;
}
- dPrint("mnode rpc server is opened");
+ dPrint("inter-dnodes RPC server is opened");
return 0;
}
-void dnodeCleanupMnode() {
- if (tsDnodeMnodeRpc) {
- rpcClose(tsDnodeMnodeRpc);
- tsDnodeMnodeRpc = NULL;
- dPrint("mnode rpc server is closed");
+void dnodeCleanupServer() {
+ if (tsDnodeServerRpc) {
+ rpcClose(tsDnodeServerRpc);
+ tsDnodeServerRpc = NULL;
+ dPrint("inter-dnodes RPC server is closed");
}
}
-static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
+void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
+
+static void dnodeProcessReqMsgFromDnode(SRpcMsg *pMsg) {
SRpcMsg rspMsg;
rspMsg.handle = pMsg->handle;
rspMsg.pCont = NULL;
@@ -74,7 +76,7 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
rspMsg.code = TSDB_CODE_NOT_READY;
rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont);
- dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle);
+ dTrace("thandle:%p, query msg is ignored since dnode not running", pMsg->handle);
return;
}
@@ -83,15 +85,11 @@ static void dnodeProcessMsgFromMnode(SRpcMsg *pMsg) {
rpcSendResponse(&rspMsg);
return;
}
-
+
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
} else {
- dError("%s is not processed in dnode mserver", taosMsg[pMsg->msgType]);
- rspMsg.code = TSDB_CODE_MSG_NOT_PROCESSED;
- rpcSendResponse(&rspMsg);
- rpcFreeCont(pMsg->pCont);
+ mgmtProcessReqMsgFromDnode(pMsg);
}
}
-
diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c
index 581273d21c..e6a392a341 100644
--- a/src/dnode/src/dnodeShell.c
+++ b/src/dnode/src/dnodeShell.c
@@ -49,7 +49,7 @@ int32_t dnodeInitShell() {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
- rpcInit.localPort = tsMnodeShellPort;
+ rpcInit.localPort = tsDnodeShellPort;
rpcInit.label = "SHELL";
rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessMsgFromShell;
diff --git a/src/inc/dnode.h b/src/inc/dnode.h
index c4b893ab86..99b9046aac 100644
--- a/src/inc/dnode.h
+++ b/src/inc/dnode.h
@@ -20,6 +20,8 @@
extern "C" {
#endif
+#include "trpc.h"
+
typedef struct {
int32_t queryReqNum;
int32_t submitReqNum;
@@ -47,6 +49,10 @@ void dnodeGetMnodeDnodeIpSet(void *ipSet);
void * dnodeGetMnodeInfos();
int32_t dnodeGetDnodeId();
+void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
+void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
+void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/inc/mnode.h b/src/inc/mnode.h
index 21955e29c1..37fec24c20 100644
--- a/src/inc/mnode.h
+++ b/src/inc/mnode.h
@@ -26,7 +26,8 @@ void mgmtCleanUpSystem();
void mgmtStopSystem();
void sdbUpdateSync();
-void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
+void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg);
+void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h
index 95310ae44a..ec9debaf2b 100644
--- a/src/inc/taosdef.h
+++ b/src/inc/taosdef.h
@@ -330,9 +330,8 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_CHILD_TABLES 100000
#define TSDB_PORT_DNODESHELL 0
-#define TSDB_PORT_DNODEMNODE 10
-#define TSDB_PORT_MNODEDNODE 15
-#define TSDB_PORT_SYNC 20
+#define TSDB_PORT_DNODEDNODE 5
+#define TSDB_PORT_SYNC 10
#define TAOS_QTYPE_RPC 0
#define TAOS_QTYPE_FWD 1
diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c
index eeaeec83f2..1bc24c6c71 100644
--- a/src/kit/shell/src/shellEngine.c
+++ b/src/kit/shell/src/shellEngine.c
@@ -68,7 +68,7 @@ TAOS *shellInit(struct arguments *args) {
tsMeterMetaKeepTimer = 3000;
// Connect to the database.
- TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort);
+ TAOS *con = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort);
if (con == NULL) {
return con;
}
diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c
index e5c50bb74e..b29b96379b 100644
--- a/src/kit/shell/src/shellImport.c
+++ b/src/kit/shell/src/shellImport.c
@@ -229,7 +229,7 @@ static void shellRunImportThreads(struct arguments* args)
ShellThreadObj *pThread = threadObj + t;
pThread->threadIndex = t;
pThread->totalThreads = args->threadNum;
- pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMnodeShellPort);
+ pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsDnodeShellPort);
if (pThread->taos == NULL) {
fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos));
exit(0);
diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c
index 22ffa78c81..f5a1145cf8 100644
--- a/src/kit/shell/src/shellLinux.c
+++ b/src/kit/shell/src/shellLinux.c
@@ -63,7 +63,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break;
case 'P':
if (arg) {
- tsMnodeShellPort = atoi(arg);
+ tsDnodeShellPort = atoi(arg);
} else {
fprintf(stderr, "Invalid port\n");
return -1;
diff --git a/src/mnode/inc/mgmtDServer.h b/src/mnode/inc/mgmtServer.h
similarity index 93%
rename from src/mnode/inc/mgmtDServer.h
rename to src/mnode/inc/mgmtServer.h
index 937ae8f1ac..180e893cb0 100644
--- a/src/mnode/inc/mgmtDServer.h
+++ b/src/mnode/inc/mgmtServer.h
@@ -20,8 +20,8 @@
extern "C" {
#endif
-int32_t mgmtInitDServer();
-void mgmtCleanupDServer();
+int32_t mgmtInitServer();
+void mgmtCleanupServer();
void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
#ifdef __cplusplus
diff --git a/src/mnode/src/mgmtDClient.c b/src/mnode/src/mgmtDClient.c
deleted file mode 100644
index 229964e1d6..0000000000
--- a/src/mnode/src/mgmtDClient.c
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "taoserror.h"
-#include "tsched.h"
-#include "tsystem.h"
-#include "tutil.h"
-#include "tglobal.h"
-#include "dnode.h"
-#include "tgrant.h"
-#include "mgmtDef.h"
-#include "mgmtLog.h"
-#include "mgmtMnode.h"
-#include "mgmtDb.h"
-#include "mgmtDnode.h"
-#include "mgmtProfile.h"
-#include "mgmtShell.h"
-#include "mgmtTable.h"
-#include "mgmtVgroup.h"
-
-static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg);
-static void (*mgmtProcessDnodeRspFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
-static void *tsMgmtDClientRpc = NULL;
-
-int32_t mgmtInitDClient() {
- SRpcInit rpcInit = {0};
- rpcInit.localPort = 0;
- rpcInit.label = "MND-DC";
- rpcInit.numOfThreads = 1;
- rpcInit.cfp = mgmtProcessRspFromDnode;
- rpcInit.sessions = 100;
- rpcInit.connType = TAOS_CONN_CLIENT;
- rpcInit.idleTime = tsShellActivityTimer * 1000;
- rpcInit.user = "mgmtDClient";
- rpcInit.ckey = "key";
- rpcInit.secret = "secret";
-
- tsMgmtDClientRpc = rpcOpen(&rpcInit);
- if (tsMgmtDClientRpc == NULL) {
- mError("failed to init client connection to dnode");
- return -1;
- }
-
- mPrint("client connection to dnode is opened");
- return 0;
-}
-
-void mgmtCleanupDClient() {
- if (tsMgmtDClientRpc) {
- rpcClose(tsMgmtDClientRpc);
- tsMgmtDClientRpc = NULL;
- }
-}
-
-void mgmtAddDClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
- mgmtProcessDnodeRspFp[msgType] = fp;
-}
-
-void mgmtSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg) {
- rpcSendRequest(tsMgmtDClientRpc, ipSet, rpcMsg);
-}
-
-static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
- if (mgmtProcessDnodeRspFp[rpcMsg->msgType]) {
- (*mgmtProcessDnodeRspFp[rpcMsg->msgType])(rpcMsg);
- } else {
- mError("%s is not processed in mgmt dclient", taosMsg[rpcMsg->msgType]);
- SRpcMsg rpcRsp = {.pCont = 0, .contLen = 0, .code = TSDB_CODE_OPS_NOT_SUPPORT, .handle = rpcMsg->handle};
- rpcSendResponse(&rpcRsp);
- }
-
- rpcFreeCont(rpcMsg->pCont);
-}
diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c
index 90ee40dc93..33c012f079 100644
--- a/src/mnode/src/mgmtDnode.c
+++ b/src/mnode/src/mgmtDnode.c
@@ -27,8 +27,6 @@
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
-#include "mgmtDClient.h"
-#include "mgmtDServer.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
@@ -152,7 +150,7 @@ int32_t mgmtInitDnodes() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CREATE_DNODE, mgmtProcessCreateDnodeMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_DROP_DNODE, mgmtProcessDropDnodeMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_CONFIG_DNODE, mgmtProcessCfgDnodeMsg);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP, mgmtProcessCfgDnodeMsgRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_STATUS, mgmtProcessDnodeStatusMsg);
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_MODULE, mgmtGetModuleMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_MODULE, mgmtRetrieveModules);
@@ -241,7 +239,7 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
.pCont = pMdCfgDnode,
.contLen = sizeof(SMDCfgDnodeMsg)
};
- mgmtSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
+ dnodeSendMsgToDnode(&ipSet, &rpcMdCfgDnodeMsg);
rpcRsp.code = TSDB_CODE_SUCCESS;
}
diff --git a/src/mnode/src/mgmtMain.c b/src/mnode/src/mgmtMain.c
index 0f18c95539..aa95381df3 100644
--- a/src/mnode/src/mgmtMain.c
+++ b/src/mnode/src/mgmtMain.c
@@ -24,12 +24,11 @@
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
+#include "mgmtServer.h"
#include "mgmtAcct.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
-#include "mgmtDClient.h"
-#include "mgmtDServer.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
#include "mgmtUser.h"
@@ -100,11 +99,7 @@ int32_t mgmtStartSystem() {
mError("failed to init balance")
}
- if (mgmtInitDClient() < 0) {
- return -1;
- }
-
- if (mgmtInitDServer() < 0) {
+ if (mgmtInitServer() < 0) {
return -1;
}
@@ -141,8 +136,7 @@ void mgmtCleanUpSystem() {
mgmtCleanupMnodes();
balanceCleanUp();
mgmtCleanUpShell();
- mgmtCleanupDClient();
- mgmtCleanupDServer();
+ mgmtCleanupServer();
mgmtCleanUpAccts();
mgmtCleanUpTables();
mgmtCleanUpVgroups();
diff --git a/src/mnode/src/mgmtDServer.c b/src/mnode/src/mgmtServer.c
similarity index 55%
rename from src/mnode/src/mgmtDServer.c
rename to src/mnode/src/mgmtServer.c
index 726554e490..c2b07a3f4e 100644
--- a/src/mnode/src/mgmtDServer.c
+++ b/src/mnode/src/mgmtServer.c
@@ -27,7 +27,6 @@
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
-#include "mgmtDServer.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
@@ -35,45 +34,21 @@
#include "mgmtTable.h"
#include "mgmtVgroup.h"
-static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg);
-static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey);
static void (*mgmtProcessDnodeMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *rpcMsg);
-static void *tsMgmtDServerRpc;
-static void *tsMgmtDServerQhandle = NULL;
+static void *tsMgmtServerQhandle = NULL;
-int32_t mgmtInitDServer() {
- SRpcInit rpcInit = {0};
- rpcInit.localPort = tsMnodeDnodePort;
- rpcInit.label = "MND-DS";
- rpcInit.numOfThreads = 1;
- rpcInit.cfp = mgmtProcessMsgFromDnode;
- rpcInit.sessions = 100;
- rpcInit.connType = TAOS_CONN_SERVER;
- rpcInit.idleTime = tsShellActivityTimer * 1000;
- rpcInit.afp = mgmtDServerRetrieveAuth;
+int32_t mgmtInitServer() {
- tsMgmtDServerRpc = rpcOpen(&rpcInit);
- if (tsMgmtDServerRpc == NULL) {
- mError("failed to init server connection to dnode");
- return -1;
- }
-
- tsMgmtDServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS");
+ tsMgmtServerQhandle = taosInitScheduler(tsMaxShellConns, 1, "MS");
mPrint("server connection to dnode is opened");
return 0;
}
-void mgmtCleanupDServer() {
- if (tsMgmtDServerQhandle) {
- taosCleanUpScheduler(tsMgmtDServerQhandle);
- tsMgmtDServerQhandle = NULL;
- }
-
- if (tsMgmtDServerRpc) {
- rpcClose(tsMgmtDServerRpc);
- tsMgmtDServerRpc = NULL;
- mPrint("server connection to dnode is closed");
+void mgmtCleanupServer() {
+ if (tsMgmtServerQhandle) {
+ taosCleanUpScheduler(tsMgmtServerQhandle);
+ tsMgmtServerQhandle = NULL;
}
}
@@ -81,21 +56,27 @@ void mgmtAddDServerMsgHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)) {
mgmtProcessDnodeMsgFp[msgType] = fp;
}
-static void mgmtProcessDServerRequest(SSchedMsg *sched) {
+static void mgmtProcessRequestFromDnode(SSchedMsg *sched) {
SRpcMsg *pMsg = sched->msg;
(*mgmtProcessDnodeMsgFp[pMsg->msgType])(pMsg);
rpcFreeCont(pMsg->pCont);
free(pMsg);
}
-static void mgmtAddToDServerQueue(SRpcMsg *pMsg) {
+static void mgmtAddToServerQueue(SRpcMsg *pMsg) {
SSchedMsg schedMsg;
schedMsg.msg = pMsg;
- schedMsg.fp = mgmtProcessDServerRequest;
- taosScheduleTask(tsMgmtDServerQhandle, &schedMsg);
+ schedMsg.fp = mgmtProcessRequestFromDnode;
+ taosScheduleTask(tsMgmtServerQhandle, &schedMsg);
}
-static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
+void mgmtProcessReqMsgFromDnode(SRpcMsg *rpcMsg) {
+ if (mgmtProcessDnodeMsgFp[rpcMsg->msgType] == NULL) {
+ mError("%s is not processed in mnode", taosMsg[rpcMsg->msgType]);
+ mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED);
+ rpcFreeCont(rpcMsg->pCont);
+ }
+
if (rpcMsg->pCont == NULL) {
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_LEN);
return;
@@ -116,17 +97,8 @@ static void mgmtProcessMsgFromDnode(SRpcMsg *rpcMsg) {
return;
}
- if (mgmtProcessDnodeMsgFp[rpcMsg->msgType]) {
- SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
- memcpy(pMsg, rpcMsg, sizeof(SRpcMsg));
- mgmtAddToDServerQueue(pMsg);
- } else {
- mError("%s is not processed in mgmt dserver", taosMsg[rpcMsg->msgType]);
- mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_MSG_NOT_PROCESSED);
- rpcFreeCont(rpcMsg->pCont);
- }
+ SRpcMsg *pMsg = malloc(sizeof(SRpcMsg));
+ memcpy(pMsg, rpcMsg, sizeof(SRpcMsg));
+ mgmtAddToServerQueue(pMsg);
}
-static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
- return TSDB_CODE_SUCCESS;
-}
diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c
index 6ed19b3d11..c6d973fd81 100644
--- a/src/mnode/src/mgmtTable.c
+++ b/src/mnode/src/mgmtTable.c
@@ -24,13 +24,12 @@
#include "tname.h"
#include "tidpool.h"
#include "tglobal.h"
+#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
-#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
-#include "mgmtDServer.h"
#include "tgrant.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
@@ -538,10 +537,10 @@ int32_t mgmtInitTables() {
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_TABLE_META, mgmtProcessTableMetaMsg);
mgmtAddShellMsgHandle(TSDB_MSG_TYPE_CM_STABLE_VGROUP, mgmtProcessSuperTableVgroupMsg);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP, mgmtProcessCreateChildTableRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_TABLE_RSP, mgmtProcessDropChildTableRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_STABLE_RSP, mgmtProcessDropSuperTableRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP, mgmtProcessAlterTableRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_TABLE, mgmtProcessTableCfgMsg);
@@ -810,7 +809,7 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
if (pVgroup != NULL) {
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
- mgmtSendMsgToDnode(&ipSet, &rpcMsg);
+ dnodeSendMsgToDnode(&ipSet, &rpcMsg);
mgmtDecVgroupRef(pVgroup);
}
}
@@ -1487,7 +1486,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
- mgmtSendMsgToDnode(&ipSet, &rpcMsg);
+ dnodeSendMsgToDnode(&ipSet, &rpcMsg);
}
static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
@@ -1525,7 +1524,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
.msgType = TSDB_MSG_TYPE_MD_DROP_TABLE
};
- mgmtSendMsgToDnode(&ipSet, &rpcMsg);
+ dnodeSendMsgToDnode(&ipSet, &rpcMsg);
}
static int32_t mgmtModifyChildTableTagValue(SChildTableObj *pTable, char *tagName, char *nContent) {
@@ -1827,7 +1826,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
- mgmtSendMsgToDnode(&ipSet, &rpcRsp);
+ dnodeSendMsgToDnode(&ipSet, &rpcRsp);
mgmtDecTableRef(pTable);
mgmtDecDnodeRef(pDnode);
diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c
index d8007d000d..4b455de41a 100644
--- a/src/mnode/src/mgmtVgroup.c
+++ b/src/mnode/src/mgmtVgroup.c
@@ -23,11 +23,10 @@
#include "ttime.h"
#include "tbalance.h"
#include "tglobal.h"
+#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
-#include "mgmtDClient.h"
-#include "mgmtDServer.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
@@ -220,8 +219,8 @@ int32_t mgmtInitVgroups() {
mgmtAddShellShowMetaHandle(TSDB_MGMT_TABLE_VGROUP, mgmtGetVgroupMeta);
mgmtAddShellShowRetrieveHandle(TSDB_MGMT_TABLE_VGROUP, mgmtRetrieveVgroups);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
- mgmtAddDClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mgmtProcessCreateVnodeRsp);
+ dnodeAddClientRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mgmtProcessDropVnodeRsp);
mgmtAddDServerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mgmtProcessVnodeCfgMsg);
mTrace("table:vgroups is created");
@@ -583,7 +582,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
};
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
strcpy(ipSet.fqdn[i], pVgroup->vnodeGid[i].pDnode->dnodeFqdn);
- ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEMNODE;
+ ipSet.port[i] = pVgroup->vnodeGid[i].pDnode->dnodePort + TSDB_PORT_DNODEDNODE;
}
return ipSet;
}
@@ -594,7 +593,7 @@ SRpcIpSet mgmtGetIpSetFromIp(char *ep) {
ipSet.numOfIps = 1;
ipSet.inUse = 0;
taosGetFqdnPortFromEp(ep, ipSet.fqdn[0], &ipSet.port[0]);
- ipSet.port[0] += TSDB_PORT_DNODEMNODE;
+ ipSet.port[0] += TSDB_PORT_DNODEDNODE;
return ipSet;
}
@@ -608,7 +607,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_VNODE
};
- mgmtSendMsgToDnode(ipSet, &rpcMsg);
+ dnodeSendMsgToDnode(ipSet, &rpcMsg);
}
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
@@ -674,7 +673,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_DROP_VNODE
};
- mgmtSendMsgToDnode(ipSet, &rpcMsg);
+ dnodeSendMsgToDnode(ipSet, &rpcMsg);
}
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {