diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h
index 037199d3bd..561a81167f 100644
--- a/include/common/taosmsg.h
+++ b/include/common/taosmsg.h
@@ -657,15 +657,18 @@ typedef struct SVgroupAccess {
} SVgroupAccess;
typedef struct {
- int32_t dnodeId;
- uint32_t moduleStatus;
- uint32_t numOfVnodes;
- char clusterId[TSDB_CLUSTER_ID_LEN];
- char reserved[16];
+ int32_t dnodeId;
+ int8_t dropped;
+ char reserved[19];
+ int64_t clusterId;
+ int32_t numOfDnodes;
+ int32_t numOfVnodes;
} SDnodeCfg;
typedef struct {
int32_t dnodeId;
+ int8_t isMnode;
+ int8_t reserved;
uint16_t dnodePort;
char dnodeFqdn[TSDB_FQDN_LEN];
} SDnodeEp;
@@ -676,55 +679,29 @@ typedef struct {
} SDnodeEps;
typedef struct {
- int32_t mnodeId;
- char mnodeEp[TSDB_EP_LEN];
-} SMInfo;
-
-typedef struct SMInfos {
- int8_t inUse;
- int8_t mnodeNum;
- SMInfo mnodeInfos[TSDB_MAX_REPLICA];
-} SMInfos;
-
-typedef struct {
- int32_t numOfMnodes; // tsNumOfMnodes
- int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum
- int32_t offlineThreshold; // tsOfflineThreshold
- int32_t statusInterval; // tsStatusInterval
- int32_t maxtablesPerVnode;
- int32_t maxVgroupsPerDb;
- char arbitrator[TSDB_EP_LEN]; // tsArbitrator
- char reserve[2]; // to solve arm32 bus error
- char timezone[64]; // tsTimezone
- int64_t checkTime; // 1970-01-01 00:00:00.000
- char locale[TSDB_LOCALE_LEN]; // tsLocale
- char charset[TSDB_LOCALE_LEN]; // tsCharset
- int8_t enableBalance; // tsEnableBalance
- int8_t flowCtrl;
- int8_t slaveQuery;
- int8_t adjustMaster;
- int8_t reserved[4];
+ int32_t statusInterval; // tsStatusInterval
+ int8_t reserved[36];
+ int64_t checkTime; // 1970-01-01 00:00:00.000
+ char timezone[64]; // tsTimezone
+ char locale[TSDB_LOCALE_LEN]; // tsLocale
+ char charset[TSDB_LOCALE_LEN]; // tsCharset
} SClusterCfg;
typedef struct SStatusMsg {
uint32_t version;
int32_t dnodeId;
+ uint32_t lastReboot; // time stamp for last reboot
+ int32_t openVnodes;
+ int32_t numOfCores;
+ float diskAvailable;
+ int8_t reserved[36];
char dnodeEp[TSDB_EP_LEN];
- uint32_t moduleStatus;
- uint32_t lastReboot; // time stamp for last reboot
- uint16_t reserve1; // from config file
- uint16_t openVnodes;
- uint16_t numOfCores;
- float diskAvailable; // GB
- char clusterId[TSDB_CLUSTER_ID_LEN];
- uint8_t alternativeRole;
- uint8_t reserve2[15];
+ int64_t clusterId;
SClusterCfg clusterCfg;
SVnodeLoad load[];
} SStatusMsg;
typedef struct {
- SMInfos mnodes;
SDnodeCfg dnodeCfg;
SVgroupAccess vgAccess[];
} SStatusRsp;
@@ -860,9 +837,9 @@ typedef struct {
} SCreateDnodeMsg, SDropDnodeMsg;
typedef struct {
- int32_t dnodeId;
- char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
- SMInfos mnodes;
+ int32_t dnodeId;
+ int32_t mnodeNum;
+ SDnodeEp mnodeEps[];
} SCreateMnodeMsg;
typedef struct {
diff --git a/include/server/dnode/dnode.h b/include/server/dnode/dnode.h
index d7aaa0e008..3499913afa 100644
--- a/include/server/dnode/dnode.h
+++ b/include/server/dnode/dnode.h
@@ -65,7 +65,7 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
-void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
+void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
#ifdef __cplusplus
}
diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h
index f15fc1792a..9bbc2e4b10 100644
--- a/include/server/mnode/mnode.h
+++ b/include/server/mnode/mnode.h
@@ -60,7 +60,7 @@ typedef struct {
typedef struct {
SMnodeFp fp;
- char clusterId[TSDB_CLUSTER_ID_LEN];
+ int64_t clusterId;
int32_t dnodeId;
} SMnodePara;
diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt
index 2313c1633c..bc0d439407 100644
--- a/source/client/CMakeLists.txt
+++ b/source/client/CMakeLists.txt
@@ -7,4 +7,4 @@ target_include_directories(
target_link_libraries(
taos
INTERFACE api
-)
\ No newline at end of file
+)
diff --git a/source/client/src/client.c b/source/client/src/client.c
index 77c4aa1b2d..b1663239e6 100644
--- a/source/client/src/client.c
+++ b/source/client/src/client.c
@@ -19,3 +19,5 @@
//
//}
+int taos_init() { return 0; }
+void taos_cleanup(void) {}
diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c
index 35611f3da8..4b3ca11e4b 100644
--- a/source/libs/sync/src/sync.c
+++ b/source/libs/sync/src/sync.c
@@ -13,4 +13,7 @@
* along with this program. If not, see .
*/
-#include "sync.h"
\ No newline at end of file
+#include "sync.h"
+
+int32_t syncInit() {return 0;}
+void syncCleanUp() {}
\ No newline at end of file
diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c
index 6dea4a4e57..1e74c1613c 100644
--- a/source/libs/wal/src/wal.c
+++ b/source/libs/wal/src/wal.c
@@ -11,4 +11,9 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
- */
\ No newline at end of file
+ */
+
+#include "wal.h"
+
+int32_t walInit() {return 0;}
+void walCleanUp() {}
\ No newline at end of file
diff --git a/source/server/dnode/CMakeLists.txt b/source/server/dnode/CMakeLists.txt
index e627ca94e8..e1462ab3c8 100644
--- a/source/server/dnode/CMakeLists.txt
+++ b/source/server/dnode/CMakeLists.txt
@@ -5,6 +5,9 @@ target_link_libraries(
PUBLIC cjson
PUBLIC mnode
PUBLIC vnode
+ PUBLIC wal
+ PUBLIC sync
+ PUBLIC taos
)
target_include_directories(
dnode
diff --git a/source/server/dnode/inc/dnodeDnodeEps.h b/source/server/dnode/inc/dnodeDnodeEps.h
deleted file mode 100644
index 01023da7fc..0000000000
--- a/source/server/dnode/inc/dnodeDnodeEps.h
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_DNODE_DNODE_EPS_H_
-#define _TD_DNODE_DNODE_EPS_H_
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-#include "dnodeInt.h"
-
-int32_t dnodeInitDnodeEps();
-void dnodeCleanupDnodeEps();
-void dnodeUpdateDnodeEps(SDnodeEps *data);
-bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr);
-void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_DNODE_DNODE_EPS_H_*/
\ No newline at end of file
diff --git a/source/server/dnode/inc/dnodeCfg.h b/source/server/dnode/inc/dnodeEps.h
similarity index 62%
rename from source/server/dnode/inc/dnodeCfg.h
rename to source/server/dnode/inc/dnodeEps.h
index eda6231579..ac68d16374 100644
--- a/source/server/dnode/inc/dnodeCfg.h
+++ b/source/server/dnode/inc/dnodeEps.h
@@ -13,25 +13,30 @@
* along with this program. If not, see .
*/
-#ifndef _TD_DNODE_CFG_H_
-#define _TD_DNODE_CFG_H_
+#ifndef _TD_DNODE_EPS_H_
+#define _TD_DNODE_EPS_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "dnodeInt.h"
+int32_t dnodeInitEps();
+void dnodeCleanupEps();
-int32_t dnodeInitCfg();
-void dnodeCleanupCfg();
void dnodeUpdateCfg(SDnodeCfg *data);
+void dnodeUpdateDnodeEps(SDnodeEps *data);
+void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet);
int32_t dnodeGetDnodeId();
-void dnodeGetClusterId(char *clusterId);
-void dnodeGetCfg(int32_t *dnodeId, char *clusterId);
-void dnodeSetDropped();
+int64_t dnodeGetClusterId();
+void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
+
+void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
+void dnodeGetEpSetForShell(SRpcEpSet *epSet);
+void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus
}
#endif
-#endif /*_TD_DNODE_CFG_H_*/
+#endif /*_TD_DNODE_EPS_H_*/
\ No newline at end of file
diff --git a/source/server/dnode/inc/dnodeInt.h b/source/server/dnode/inc/dnodeInt.h
index 38eac2794e..85f474a391 100644
--- a/source/server/dnode/inc/dnodeInt.h
+++ b/source/server/dnode/inc/dnodeInt.h
@@ -24,6 +24,7 @@ extern "C" {
#include "tglobal.h"
#include "tlog.h"
#include "trpc.h"
+#include "ttimer.h"
#include "dnode.h"
extern int32_t dDebugFlag;
@@ -35,6 +36,12 @@ extern int32_t dDebugFlag;
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }}
+typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat;
+
+EDnStat dnodeGetRunStat();
+void dnodeSetRunStat();
+void dnodeGetStartup(SStartupStep *);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/server/dnode/inc/dnodeMain.h b/source/server/dnode/inc/dnodeMain.h
deleted file mode 100644
index b3aeeceb44..0000000000
--- a/source/server/dnode/inc/dnodeMain.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_DNODE_MAIN_H_
-#define _TD_DNODE_MAIN_H_
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-#include "dnodeInt.h"
-
-typedef enum {
- DN_RUN_STAT_INIT,
- DN_RUN_STAT_RUNNING,
- DN_RUN_STAT_STOPPED
-} EDnStat;
-
-int32_t dnodeInitMain();
-void dnodeCleanupMain();
-int32_t dnodeInitStorage();
-void dnodeCleanupStorage();
-void dnodeReportStartup(char *name, char *desc);
-void dnodeReportStartupFinished(char *name, char *desc);
-void dnodeProcessStartupReq(SRpcMsg *pMsg);
-void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
-void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
-EDnStat dnodeGetRunStat();
-void dnodeSetRunStat();
-void* dnodeGetTimer();
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_DNODE_MAIN_H_*/
diff --git a/source/server/dnode/inc/dnodeMnodeEps.h b/source/server/dnode/inc/dnodeMnodeEps.h
deleted file mode 100644
index c890f6921d..0000000000
--- a/source/server/dnode/inc/dnodeMnodeEps.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_DNODE_MNODE_EP_H_
-#define _TD_DNODE_MNODE_EP_H_
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-#include "dnodeInt.h"
-
-int32_t dnodeInitMnodeEps();
-void dnodeCleanupMnodeEps();
-void dnodeUpdateMnodeFromStatus(SMInfos *pMinfos);
-void dnodeUpdateMnodeFromPeer(SRpcEpSet *pEpSet);
-void dnodeGetEpSetForPeer(SRpcEpSet *epSet);
-void dnodeGetEpSetForShell(SRpcEpSet *epSet);
-void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_DNODE_MNODE_EP_H_*/
diff --git a/source/server/dnode/inc/dnodeStatus.h b/source/server/dnode/inc/dnodeMsg.h
similarity index 81%
rename from source/server/dnode/inc/dnodeStatus.h
rename to source/server/dnode/inc/dnodeMsg.h
index f0473b93f1..0790fa7e3e 100644
--- a/source/server/dnode/inc/dnodeStatus.h
+++ b/source/server/dnode/inc/dnodeMsg.h
@@ -21,9 +21,12 @@ extern "C" {
#endif
#include "dnodeInt.h"
-int32_t dnodeInitStatus();
-void dnodeCleanupStatus();
+int32_t dnodeInitMsg();
+void dnodeCleanupMsg();
void dnodeProcessStatusRsp(SRpcMsg *pMsg);
+void dnodeProcessStartupReq(SRpcMsg *pMsg);
+void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
+void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
#ifdef __cplusplus
}
diff --git a/source/server/dnode/src/dnodeCfg.c b/source/server/dnode/src/dnodeCfg.c
deleted file mode 100644
index f9ed491464..0000000000
--- a/source/server/dnode/src/dnodeCfg.c
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "cJSON.h"
-#include "tglobal.h"
-#include "dnodeCfg.h"
-
-static struct DnCfg {
- int32_t dnodeId;
- int32_t dropped;
- char clusterId[TSDB_CLUSTER_ID_LEN];
- char file[PATH_MAX + 20];
- pthread_mutex_t mutex;
-} tsDcfg;
-
-static int32_t dnodeReadCfg() {
- int32_t len = 0;
- int32_t maxLen = 200;
- char * content = calloc(1, maxLen + 1);
- cJSON * root = NULL;
- FILE * fp = NULL;
-
- fp = fopen(tsDcfg.file, "r");
- if (!fp) {
- dDebug("file %s not exist", tsDcfg.file);
- goto PARSE_CFG_OVER;
- }
-
- len = (int32_t)fread(content, 1, maxLen, fp);
- if (len <= 0) {
- dError("failed to read %s since content is null", tsDcfg.file);
- goto PARSE_CFG_OVER;
- }
-
- content[len] = 0;
- root = cJSON_Parse(content);
- if (root == NULL) {
- dError("failed to read %s since invalid json format", tsDcfg.file);
- goto PARSE_CFG_OVER;
- }
-
- cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
- if (!dnodeId || dnodeId->type != cJSON_Number) {
- dError("failed to read %s since dnodeId not found", tsDcfg.file);
- goto PARSE_CFG_OVER;
- }
- tsDcfg.dnodeId = (int32_t)dnodeId->valueint;
-
- cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
- if (!dropped || dropped->type != cJSON_Number) {
- dError("failed to read %s since dropped not found", tsDcfg.file);
- goto PARSE_CFG_OVER;
- }
- tsDcfg.dropped = (int32_t)dropped->valueint;
-
- cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
- if (!clusterId || clusterId->type != cJSON_String) {
- dError("failed to read %s since clusterId not found", tsDcfg.file);
- goto PARSE_CFG_OVER;
- }
- tstrncpy(tsDcfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN);
-
- dInfo("successed to read %s", tsDcfg.file);
-
-PARSE_CFG_OVER:
- if (content != NULL) free(content);
- if (root != NULL) cJSON_Delete(root);
- if (fp != NULL) fclose(fp);
- terrno = 0;
-
- return 0;
-}
-
-static int32_t dnodeWriteCfg() {
- FILE *fp = fopen(tsDcfg.file, "w");
- if (!fp) {
- dError("failed to write %s since %s", tsDcfg.file, strerror(errno));
- return -1;
- }
-
- int32_t len = 0;
- int32_t maxLen = 200;
- char * content = calloc(1, maxLen + 1);
-
- len += snprintf(content + len, maxLen - len, "{\n");
- len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsDcfg.dnodeId);
- len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", tsDcfg.dropped);
- len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsDcfg.clusterId);
- len += snprintf(content + len, maxLen - len, "}\n");
-
- fwrite(content, 1, len, fp);
- taosFsyncFile(fileno(fp));
- fclose(fp);
- free(content);
- terrno = 0;
-
- dInfo("successed to write %s", tsDcfg.file);
- return 0;
-}
-
-int32_t dnodeInitCfg() {
- tsDcfg.dnodeId = 0;
- tsDcfg.dropped = 0;
- tsDcfg.clusterId[0] = 0;
- snprintf(tsDcfg.file, sizeof(tsDcfg.file), "%s/dnodeCfg.json", tsDnodeDir);
- pthread_mutex_init(&tsDcfg.mutex, NULL);
-
- int32_t ret = dnodeReadCfg();
- if (ret == 0) {
- dInfo("dnode cfg is initialized");
- }
-
- if (tsDcfg.dropped) {
- dInfo("dnode is dropped and start to exit");
- return -1;
- }
-
- return ret;
-}
-
-void dnodeCleanupCfg() {
- pthread_mutex_destroy(&tsDcfg.mutex);
-}
-
-void dnodeUpdateCfg(SDnodeCfg *data) {
- if (tsDcfg.dnodeId != 0) return;
-
- pthread_mutex_lock(&tsDcfg.mutex);
-
- tsDcfg.dnodeId = data->dnodeId;
- tstrncpy(tsDcfg.clusterId, data->clusterId, TSDB_CLUSTER_ID_LEN);
- dInfo("dnodeId is set to %d, clusterId is set to %s", data->dnodeId, data->clusterId);
-
- dnodeWriteCfg();
- pthread_mutex_unlock(&tsDcfg.mutex);
-}
-
-void dnodeSetDropped() {
- pthread_mutex_lock(&tsDcfg.mutex);
- tsDcfg.dropped = 1;
- dnodeWriteCfg();
- pthread_mutex_unlock(&tsDcfg.mutex);
-}
-
-int32_t dnodeGetDnodeId() {
- int32_t dnodeId = 0;
- pthread_mutex_lock(&tsDcfg.mutex);
- dnodeId = tsDcfg.dnodeId;
- pthread_mutex_unlock(&tsDcfg.mutex);
- return dnodeId;
-}
-
-void dnodeGetClusterId(char *clusterId) {
- pthread_mutex_lock(&tsDcfg.mutex);
- tstrncpy(clusterId, tsDcfg.clusterId, TSDB_CLUSTER_ID_LEN);
- pthread_mutex_unlock(&tsDcfg.mutex);
-}
-
-void dnodeGetCfg(int32_t *dnodeId, char *clusterId) {
- pthread_mutex_lock(&tsDcfg.mutex);
- *dnodeId = tsDcfg.dnodeId;
- tstrncpy(clusterId, tsDcfg.clusterId, TSDB_CLUSTER_ID_LEN);
- pthread_mutex_unlock(&tsDcfg.mutex);
-}
diff --git a/source/server/dnode/src/dnodeDnodeEps.c b/source/server/dnode/src/dnodeDnodeEps.c
deleted file mode 100644
index 15c084439f..0000000000
--- a/source/server/dnode/src/dnodeDnodeEps.c
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "cJSON.h"
-#include "thash.h"
-#include "tglobal.h"
-#include "dnodeDnodeEps.h"
-#include "dnodeCfg.h"
-
-static struct {
- int32_t dnodeId;
- int32_t dnodeNum;
- SDnodeEp * dnodeList;
- SHashObj * dnodeHash;
- char file[PATH_MAX + 20];
- pthread_mutex_t mutex;
-} tsDeps;
-
-static void dnodePrintEps() {
- dDebug("print dnodeEp, dnodeNum:%d", tsDeps.dnodeNum);
- for (int32_t i = 0; i < tsDeps.dnodeNum; i++) {
- SDnodeEp *ep = &tsDeps.dnodeList[i];
- dDebug("dnode:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort);
- }
-}
-
-static void dnodeResetEps(SDnodeEps *data) {
- assert(data != NULL);
-
- if (data->dnodeNum > tsDeps.dnodeNum) {
- SDnodeEp *tmp = calloc(data->dnodeNum, sizeof(SDnodeEp));
- if (tmp == NULL) return;
-
- tfree(tsDeps.dnodeList);
- tsDeps.dnodeList = tmp;
- tsDeps.dnodeNum = data->dnodeNum;
- memcpy(tsDeps.dnodeList, data->dnodeEps, tsDeps.dnodeNum * sizeof(SDnodeEp));
- dnodePrintEps();
-
- for (int32_t i = 0; i < tsDeps.dnodeNum; ++i) {
- SDnodeEp *ep = &tsDeps.dnodeList[i];
- taosHashPut(tsDeps.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
- }
- }
-}
-
-static int32_t dnodeReadEps() {
- int32_t len = 0;
- int32_t maxLen = 30000;
- char * content = calloc(1, maxLen + 1);
- cJSON * root = NULL;
- FILE * fp = NULL;
-
- fp = fopen(tsDeps.file, "r");
- if (!fp) {
- dDebug("file %s not exist", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
-
- len = (int32_t)fread(content, 1, maxLen, fp);
- if (len <= 0) {
- dError("failed to read %s since content is null", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
-
- content[len] = 0;
- root = cJSON_Parse(content);
- if (root == NULL) {
- dError("failed to read %s since invalid json format", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
-
- cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum");
- if (!dnodeNum || dnodeNum->type != cJSON_Number) {
- dError("failed to read %s since dnodeNum not found", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
-
- cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
- if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
- dError("failed to read %s since dnodeInfos not found", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
-
- int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
- if (dnodeInfosSize != dnodeNum->valueint) {
- dError("failed to read %s since dnodeInfos size:%d not matched dnodeNum:%d", tsDeps.file, dnodeInfosSize,
- (int32_t)dnodeNum->valueint);
- goto PRASE_EPS_OVER;
- }
-
- tsDeps.dnodeNum = dnodeInfosSize;
- tsDeps.dnodeList = calloc(dnodeInfosSize, sizeof(SDnodeEp));
- if (tsDeps.dnodeList == NULL) {
- dError("failed to calloc dnodeEpList since %s", strerror(errno));
- goto PRASE_EPS_OVER;
- }
-
- for (int32_t i = 0; i < dnodeInfosSize; ++i) {
- cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
- if (dnodeInfo == NULL) break;
-
- SDnodeEp *ep = &tsDeps.dnodeList[i];
-
- cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
- if (!dnodeId || dnodeId->type != cJSON_Number) {
- dError("failed to read %s, dnodeId not found", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
- ep->dnodeId = (int32_t)dnodeId->valueint;
-
- cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
- if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
- dError("failed to read %s, dnodeFqdn not found", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
- tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
-
- cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
- if (!dnodePort || dnodePort->type != cJSON_Number) {
- dError("failed to read %s, dnodePort not found", tsDeps.file);
- goto PRASE_EPS_OVER;
- }
- ep->dnodePort = (uint16_t)dnodePort->valueint;
- }
-
- dInfo("succcessed to read file %s", tsDeps.file);
- dnodePrintEps();
-
-PRASE_EPS_OVER:
- if (content != NULL) free(content);
- if (root != NULL) cJSON_Delete(root);
- if (fp != NULL) fclose(fp);
-
- if (dnodeIsDnodeEpChanged(tsDeps.dnodeId, tsLocalEp)) {
- dError("dnode:%d, localEp different from %s dnodeEps.json and need reconfigured", tsDeps.dnodeId, tsLocalEp);
- return -1;
- }
-
- terrno = 0;
- return 0;
-}
-
-static int32_t dnodeWriteEps() {
- FILE *fp = fopen(tsDeps.file, "w");
- if (!fp) {
- dError("failed to write %s since %s", tsDeps.file, strerror(errno));
- return -1;
- }
-
- int32_t len = 0;
- int32_t maxLen = 30000;
- char * content = calloc(1, maxLen + 1);
-
- len += snprintf(content + len, maxLen - len, "{\n");
- len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsDeps.dnodeNum);
- len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
- for (int32_t i = 0; i < tsDeps.dnodeNum; ++i) {
- SDnodeEp *ep = &tsDeps.dnodeList[i];
- len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId);
- len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
- len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort);
- if (i < tsDeps.dnodeNum - 1) {
- len += snprintf(content + len, maxLen - len, " },{\n");
- } else {
- len += snprintf(content + len, maxLen - len, " }]\n");
- }
- }
- len += snprintf(content + len, maxLen - len, "}\n");
-
- fwrite(content, 1, len, fp);
- taosFsyncFile(fileno(fp));
- fclose(fp);
- free(content);
- terrno = 0;
-
- dInfo("successed to write %s", tsDeps.file);
- return 0;
-}
-
-int32_t dnodeInitDnodeEps() {
- tsDeps.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
- if (tsDeps.dnodeHash == NULL) return -1;
-
- tsDeps.dnodeId = dnodeGetDnodeId();
- tsDeps.dnodeNum = 0;
- snprintf(tsDeps.file, sizeof(tsDeps.file), "%s/dnodeEps.json", tsDnodeDir);
- pthread_mutex_init(&tsDeps.mutex, NULL);
-
- int32_t ret = dnodeReadEps();
- if (ret == 0) {
- dInfo("dnode eps is initialized");
- }
-
- return ret;
-}
-
-void dnodeCleanupDnodeEps() {
- pthread_mutex_lock(&tsDeps.mutex);
-
- if (tsDeps.dnodeList != NULL) {
- free(tsDeps.dnodeList);
- tsDeps.dnodeList = NULL;
- }
-
- if (tsDeps.dnodeHash) {
- taosHashCleanup(tsDeps.dnodeHash);
- tsDeps.dnodeHash = NULL;
- }
-
- tsDeps.dnodeNum = 0;
- pthread_mutex_unlock(&tsDeps.mutex);
- pthread_mutex_destroy(&tsDeps.mutex);
-}
-
-void dnodeUpdateDnodeEps(SDnodeEps *data) {
- if (data == NULL || data->dnodeNum <= 0) return;
-
- data->dnodeNum = htonl(data->dnodeNum);
- for (int32_t i = 0; i < data->dnodeNum; ++i) {
- data->dnodeEps[i].dnodeId = htonl(data->dnodeEps[i].dnodeId);
- data->dnodeEps[i].dnodePort = htons(data->dnodeEps[i].dnodePort);
- }
-
- pthread_mutex_lock(&tsDeps.mutex);
-
- if (data->dnodeNum != tsDeps.dnodeNum) {
- dnodeResetEps(data);
- dnodeWriteEps();
- } else {
- int32_t size = data->dnodeNum * sizeof(SDnodeEp);
- if (memcmp(tsDeps.dnodeList, data->dnodeEps, size) != 0) {
- dnodeResetEps(data);
- dnodeWriteEps();
- }
- }
-
- pthread_mutex_unlock(&tsDeps.mutex);
-}
-
-bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
- bool changed = false;
-
- pthread_mutex_lock(&tsDeps.mutex);
-
- SDnodeEp *ep = taosHashGet(tsDeps.dnodeHash, &dnodeId, sizeof(int32_t));
- if (ep != NULL) {
- char epSaved[TSDB_EP_LEN + 1];
- snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
- changed = strcmp(epstr, epSaved) != 0;
- tstrncpy(epstr, epSaved, TSDB_EP_LEN);
- }
-
- pthread_mutex_unlock(&tsDeps.mutex);
-
- return changed;
-}
-
-void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
- pthread_mutex_lock(&tsDeps.mutex);
-
- SDnodeEp *ep = taosHashGet(tsDeps.dnodeHash, &dnodeId, sizeof(int32_t));
- if (ep != NULL) {
- if (port) *port = ep->dnodePort;
- if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
- if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
- }
-
- pthread_mutex_unlock(&tsDeps.mutex);
-}
diff --git a/source/server/dnode/src/dnodeEps.c b/source/server/dnode/src/dnodeEps.c
new file mode 100644
index 0000000000..5b843df2f2
--- /dev/null
+++ b/source/server/dnode/src/dnodeEps.c
@@ -0,0 +1,415 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "dnodeEps.h"
+#include "cJSON.h"
+#include "thash.h"
+
+static struct {
+ int32_t dnodeId;
+ int32_t dropped;
+ int64_t clusterId;
+ SDnodeEps *dnodeEps;
+ SHashObj *dnodeHash;
+ SRpcEpSet mnodeEpSetForShell;
+ SRpcEpSet mnodeEpSetForPeer;
+ char file[PATH_MAX + 20];
+ pthread_mutex_t mutex;
+} tsEps;
+
+void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
+ pthread_mutex_lock(&tsEps.mutex);
+ *epSet = tsEps.mnodeEpSetForPeer;
+ pthread_mutex_unlock(&tsEps.mutex);
+}
+
+void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
+ pthread_mutex_lock(&tsEps.mutex);
+ *epSet = tsEps.mnodeEpSetForShell;
+ pthread_mutex_unlock(&tsEps.mutex);
+}
+
+void dnodeUpdateMnodeEps(SRpcEpSet *ep) {
+ if (ep != NULL || ep->numOfEps <= 0) {
+ dError("mnode is changed, but content is invalid, discard it");
+ return;
+ }
+
+ pthread_mutex_lock(&tsEps.mutex);
+
+ dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse);
+
+ tsEps.mnodeEpSetForPeer = *ep;
+ for (int32_t i = 0; i < ep->numOfEps; ++i) {
+ ep->port[i] -= TSDB_PORT_DNODEDNODE;
+ dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
+ }
+ tsEps.mnodeEpSetForShell = *ep;
+
+ pthread_mutex_unlock(&tsEps.mutex);
+}
+
+void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
+ SRpcConnInfo connInfo = {0};
+ rpcGetConnInfo(rpcMsg->handle, &connInfo);
+
+ SRpcEpSet epSet = {0};
+ if (forShell) {
+ dnodeGetEpSetForShell(&epSet);
+ } else {
+ dnodeGetEpSetForPeer(&epSet);
+ }
+
+ dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse);
+
+ for (int32_t i = 0; i < epSet.numOfEps; ++i) {
+ dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
+ if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
+ if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
+ (epSet.port[i] == tsServerPort && forShell)) {
+ epSet.inUse = (i + 1) % epSet.numOfEps;
+ dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
+ }
+ }
+
+ epSet.port[i] = htons(epSet.port[i]);
+ }
+
+ rpcSendRedirectRsp(rpcMsg->handle, &epSet);
+}
+
+static void dnodePrintEps() {
+ dDebug("print dnode list, num:%d", tsEps.dnodeEps->dnodeNum);
+ for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; i++) {
+ SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i];
+ dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode);
+ }
+}
+
+static void dnodeResetEps(SDnodeEps *data) {
+ assert(data != NULL);
+
+ int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp);
+
+ if (data->dnodeNum > tsEps.dnodeEps->dnodeNum) {
+ SDnodeEps *tmp = calloc(1, size);
+ if (tmp == NULL) return;
+
+ tfree(tsEps.dnodeEps);
+ tsEps.dnodeEps = tmp;
+ }
+
+ if (tsEps.dnodeEps != data) {
+ memcpy(tsEps.dnodeEps, data, size);
+ }
+
+ tsEps.mnodeEpSetForPeer.inUse = 0;
+ tsEps.mnodeEpSetForShell.inUse = 0;
+ int32_t index = 0;
+ for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; i++) {
+ SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i];
+ if (!ep->isMnode) continue;
+ if (index >= TSDB_MAX_REPLICA) continue;
+ strcpy(tsEps.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn);
+ strcpy(tsEps.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn);
+ tsEps.mnodeEpSetForShell.port[index] = ep->dnodePort;
+ tsEps.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort;
+ index++;
+ }
+
+ for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; ++i) {
+ SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i];
+ taosHashPut(tsEps.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp));
+ }
+
+ dnodePrintEps();
+}
+
+static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) {
+ bool changed = false;
+
+ pthread_mutex_lock(&tsEps.mutex);
+
+ SDnodeEp *ep = taosHashGet(tsEps.dnodeHash, &dnodeId, sizeof(int32_t));
+ if (ep != NULL) {
+ char epSaved[TSDB_EP_LEN + 1];
+ snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
+ changed = strcmp(epstr, epSaved) != 0;
+ tstrncpy(epstr, epSaved, TSDB_EP_LEN);
+ }
+
+ pthread_mutex_unlock(&tsEps.mutex);
+
+ return changed;
+}
+
+static int32_t dnodeReadEps() {
+ int32_t len = 0;
+ int32_t maxLen = 30000;
+ char *content = calloc(1, maxLen + 1);
+ cJSON *root = NULL;
+ FILE *fp = NULL;
+
+ fp = fopen(tsEps.file, "r");
+ if (!fp) {
+ dDebug("file %s not exist", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+
+ len = (int32_t)fread(content, 1, maxLen, fp);
+ if (len <= 0) {
+ dError("failed to read %s since content is null", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+
+ content[len] = 0;
+ root = cJSON_Parse(content);
+ if (root == NULL) {
+ dError("failed to read %s since invalid json format", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+
+ cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId");
+ if (!dnodeId || dnodeId->type != cJSON_String) {
+ dError("failed to read %s since dnodeId not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ tsEps.dnodeId = atoi(dnodeId->valuestring);
+
+ cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
+ if (!dropped || dropped->type != cJSON_String) {
+ dError("failed to read %s since dropped not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ tsEps.dropped = atoi(dropped->valuestring);
+
+ cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
+ if (!clusterId || clusterId->type != cJSON_String) {
+ dError("failed to read %s since clusterId not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ tsEps.clusterId = atoll(clusterId->valuestring);
+
+ cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos");
+ if (!dnodeInfos || dnodeInfos->type != cJSON_Array) {
+ dError("failed to read %s since dnodeInfos not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+
+ int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos);
+ if (dnodeInfosSize <= 0) {
+ dError("failed to read %s since dnodeInfos size:%d invalid", tsEps.file, dnodeInfosSize);
+ goto PRASE_EPS_OVER;
+ }
+
+ tsEps.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps));
+ if (tsEps.dnodeEps == NULL) {
+ dError("failed to calloc dnodeEpList since %s", strerror(errno));
+ goto PRASE_EPS_OVER;
+ }
+ tsEps.dnodeEps->dnodeNum = dnodeInfosSize;
+
+ for (int32_t i = 0; i < dnodeInfosSize; ++i) {
+ cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i);
+ if (dnodeInfo == NULL) break;
+
+ SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i];
+
+ cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId");
+ if (!dnodeId || dnodeId->type != cJSON_String) {
+ dError("failed to read %s, dnodeId not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ ep->dnodeId = atoi(dnodeId->valuestring);
+
+ cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode");
+ if (!isMnode || isMnode->type != cJSON_String) {
+ dError("failed to read %s, isMnode not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ ep->isMnode = atoi(isMnode->valuestring);
+
+ cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn");
+ if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) {
+ dError("failed to read %s, dnodeFqdn not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN);
+
+ cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort");
+ if (!dnodePort || dnodePort->type != cJSON_String) {
+ dError("failed to read %s, dnodePort not found", tsEps.file);
+ goto PRASE_EPS_OVER;
+ }
+ ep->dnodePort = atoi(dnodePort->valuestring);
+ }
+
+ dInfo("succcessed to read file %s", tsEps.file);
+ dnodePrintEps();
+
+PRASE_EPS_OVER:
+ if (content != NULL) free(content);
+ if (root != NULL) cJSON_Delete(root);
+ if (fp != NULL) fclose(fp);
+
+ if (dnodeIsDnodeEpChanged(tsEps.dnodeId, tsLocalEp)) {
+ dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsEps.dnodeId, tsLocalEp);
+ return -1;
+ }
+
+ dnodeResetEps(tsEps.dnodeEps);
+
+ terrno = 0;
+ return 0;
+}
+
+static int32_t dnodeWriteEps() {
+ FILE *fp = fopen(tsEps.file, "w");
+ if (!fp) {
+ dError("failed to write %s since %s", tsEps.file, strerror(errno));
+ return -1;
+ }
+
+ int32_t len = 0;
+ int32_t maxLen = 30000;
+ char *content = calloc(1, maxLen + 1);
+
+ len += snprintf(content + len, maxLen - len, "{\n");
+ len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsEps.dnodeId);
+ len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsEps.dropped);
+ len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsEps.clusterId);
+ len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n");
+ for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; ++i) {
+ SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i];
+ len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId);
+ len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode);
+ len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn);
+ len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort);
+ if (i < tsEps.dnodeEps->dnodeNum - 1) {
+ len += snprintf(content + len, maxLen - len, " },{\n");
+ } else {
+ len += snprintf(content + len, maxLen - len, " }]\n");
+ }
+ }
+ len += snprintf(content + len, maxLen - len, "}\n");
+
+ fwrite(content, 1, len, fp);
+ taosFsyncFile(fileno(fp));
+ fclose(fp);
+ free(content);
+ terrno = 0;
+
+ dInfo("successed to write %s", tsEps.file);
+ return 0;
+}
+
+int32_t dnodeInitEps() {
+ tsEps.dnodeId = 0;
+ tsEps.dropped = 0;
+ tsEps.clusterId = 0;
+ tsEps.dnodeEps = NULL;
+ snprintf(tsEps.file, sizeof(tsEps.file), "%s/dnodeEps.json", tsDnodeDir);
+ pthread_mutex_init(&tsEps.mutex, NULL);
+
+ tsEps.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
+ if (tsEps.dnodeHash == NULL) return -1;
+
+ int32_t ret = dnodeReadEps();
+ if (ret == 0) {
+ dInfo("dnode eps is initialized");
+ }
+
+ return ret;
+}
+
+void dnodeCleanupEps() {
+ pthread_mutex_lock(&tsEps.mutex);
+
+ if (tsEps.dnodeEps != NULL) {
+ free(tsEps.dnodeEps);
+ tsEps.dnodeEps = NULL;
+ }
+
+ if (tsEps.dnodeHash) {
+ taosHashCleanup(tsEps.dnodeHash);
+ tsEps.dnodeHash = NULL;
+ }
+
+ pthread_mutex_unlock(&tsEps.mutex);
+ pthread_mutex_destroy(&tsEps.mutex);
+}
+
+void dnodeUpdateDnodeEps(SDnodeEps *data) {
+ if (data == NULL || data->dnodeNum <= 0) return;
+
+ pthread_mutex_lock(&tsEps.mutex);
+
+ if (data->dnodeNum != tsEps.dnodeEps->dnodeNum) {
+ dnodeResetEps(data);
+ dnodeWriteEps();
+ } else {
+ int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps);
+ if (memcmp(tsEps.dnodeEps, data, size) != 0) {
+ dnodeResetEps(data);
+ dnodeWriteEps();
+ }
+ }
+
+ pthread_mutex_unlock(&tsEps.mutex);
+}
+
+void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
+ pthread_mutex_lock(&tsEps.mutex);
+
+ SDnodeEp *ep = taosHashGet(tsEps.dnodeHash, &dnodeId, sizeof(int32_t));
+ if (ep != NULL) {
+ if (port) *port = ep->dnodePort;
+ if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN);
+ if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort);
+ }
+
+ pthread_mutex_unlock(&tsEps.mutex);
+}
+
+void dnodeUpdateCfg(SDnodeCfg *data) {
+ if (tsEps.dnodeId != 0 && !data->dropped) return;
+
+ pthread_mutex_lock(&tsEps.mutex);
+
+ tsEps.dnodeId = data->dnodeId;
+ tsEps.clusterId = data->clusterId;
+ tsEps.dropped = data->dropped;
+ dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId);
+
+ dnodeWriteEps();
+ pthread_mutex_unlock(&tsEps.mutex);
+}
+
+int32_t dnodeGetDnodeId() {
+ int32_t dnodeId = 0;
+ pthread_mutex_lock(&tsEps.mutex);
+ dnodeId = tsEps.dnodeId;
+ pthread_mutex_unlock(&tsEps.mutex);
+ return dnodeId;
+}
+
+int64_t dnodeGetClusterId() {
+ int64_t clusterId = 0;
+ pthread_mutex_lock(&tsEps.mutex);
+ clusterId = tsEps.clusterId;
+ pthread_mutex_unlock(&tsEps.mutex);
+ return clusterId;
+}
diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c
index 5f0dcc630b..d294143e57 100644
--- a/source/server/dnode/src/dnodeInt.c
+++ b/source/server/dnode/src/dnodeInt.c
@@ -14,67 +14,176 @@
*/
#define _DEFAULT_SOURCE
-#include "os.h"
-#if 0
-#include "qScript.h"
-#include "tfile.h"
-#include "tsync.h"
-#include "twal.h"
-#endif
-#include "tstep.h"
-#include "dnodeCfg.h"
#include "dnodeCheck.h"
-#include "dnodeDnodeEps.h"
-#include "dnodeMain.h"
-#include "dnodeMnodeEps.h"
-#include "dnodeStatus.h"
+#include "dnodeEps.h"
+#include "dnodeMsg.h"
#include "dnodeTrans.h"
#include "mnode.h"
+#include "sync.h"
+#include "tcache.h"
+#include "tconfig.h"
+#include "tnote.h"
+#include "tstep.h"
#include "vnode.h"
+#include "wal.h"
-static struct SSteps *tsSteps;
+static struct {
+ EDnStat runStatus;
+ SStartupStep startup;
+ SSteps *steps;
+} tsDnode;
-static int32_t dnodeInitVnodeModule(void **unused) {
+EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
+
+void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
+
+static void dnodeReportStartup(char *name, char *desc) {
+ SStartupStep *startup = &tsDnode.startup;
+ tstrncpy(startup->name, name, strlen(startup->name));
+ tstrncpy(startup->desc, desc, strlen(startup->desc));
+ startup->finished = 0;
+}
+
+static void dnodeReportStartupFinished(char *name, char *desc) {
+ SStartupStep *startup = &tsDnode.startup;
+ tstrncpy(startup->name, name, strlen(startup->name));
+ tstrncpy(startup->desc, desc, strlen(startup->desc));
+ startup->finished = 1;
+}
+
+void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); }
+
+static int32_t dnodeInitVnode() {
SVnodePara para;
- para.fp.GetDnodeEp = dnodeGetDnodeEp;
+ para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
return vnodeInit(para);
}
-static int32_t dnodeInitMnodeModule(void **unused) {
+static int32_t dnodeInitMnode() {
SMnodePara para;
- para.fp.GetDnodeEp = dnodeGetDnodeEp;
+ para.fp.GetDnodeEp = dnodeGetEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
- dnodeGetCfg(¶.dnodeId, para.clusterId);
+ para.dnodeId = dnodeGetDnodeId();
+ para.clusterId = dnodeGetClusterId();
return mnodeInit(para);
}
+static int32_t dnodeInitTfs() {}
+
+static int32_t dnodeInitMain() {
+ tsDnode.runStatus = DN_RUN_STAT_STOPPED;
+ tscEmbedded = 1;
+ taosIgnSIGPIPE();
+ taosBlockSIGPIPE();
+ taosResolveCRC();
+ taosInitGlobalCfg();
+ taosReadGlobalLogCfg();
+ taosSetCoreDump(tsEnableCoreFile);
+
+ if (!taosMkDir(tsLogDir)) {
+ printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
+ return -1;
+ }
+
+ char temp[TSDB_FILENAME_LEN];
+ sprintf(temp, "%s/taosdlog", tsLogDir);
+ if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
+ printf("failed to init log file\n");
+ }
+
+ if (!taosReadGlobalCfg()) {
+ taosPrintGlobalCfg();
+ dError("TDengine read global config failed");
+ return -1;
+ }
+
+ dInfo("start to initialize TDengine");
+
+ taosInitNotes();
+
+ return taosCheckGlobalCfg();
+}
+
+static void dnodeCleanupMain() {
+ taos_cleanup();
+ taosCloseLog();
+ taosStopCacheRefreshWorker();
+}
+
+static int32_t dnodeCheckRunning(char *dir) {
+ char filepath[256] = {0};
+ snprintf(filepath, sizeof(filepath), "%s/.running", dir);
+
+ FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
+ if (fd < 0) {
+ dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
+ return -1;
+ }
+
+ int32_t ret = taosLockFile(fd);
+ if (ret != 0) {
+ dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
+ taosCloseFile(fd);
+ return -1;
+ }
+
+ return 0;
+}
+
+static int32_t dnodeInitDir() {
+ sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
+ sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
+ sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
+
+ if (!taosMkDir(tsDnodeDir)) {
+ dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
+ return -1;
+ }
+
+ if (!taosMkDir(tsMnodeDir)) {
+ dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
+ return -1;
+ }
+
+ if (!taosMkDir(tsVnodeDir)) {
+ dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno));
+ return -1;
+ }
+
+ if (dnodeCheckRunning(tsDnodeDir) != 0) {
+ return -1;
+ }
+
+ return 0;
+}
+
+static void dnodeCleanupDir() {}
+
int32_t dnodeInit() {
- tsSteps = taosStepInit(24, dnodeReportStartup);
- if (tsSteps == NULL) return -1;
+ SSteps *steps = taosStepInit(24, dnodeReportStartup);
+ if (steps == NULL) return -1;
- taosStepAdd(tsSteps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
- taosStepAdd(tsSteps, "dnode-storage", dnodeInitStorage, dnodeCleanupStorage);
- //taosStepAdd(tsSteps, "dnode-tfs", tfInit, tfCleanup);
- taosStepAdd(tsSteps, "dnode-rpc", rpcInit, rpcCleanup);
- taosStepAdd(tsSteps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
- taosStepAdd(tsSteps, "dnode-cfg", dnodeInitCfg, dnodeCleanupCfg);
- taosStepAdd(tsSteps, "dnode-deps", dnodeInitDnodeEps, dnodeCleanupDnodeEps);
- taosStepAdd(tsSteps, "dnode-meps", dnodeInitMnodeEps, dnodeCleanupMnodeEps);
- //taosStepAdd(tsSteps, "dnode-wal", walInit, walCleanUp);
- //taosStepAdd(tsSteps, "dnode-sync", syncInit, syncCleanUp);
- taosStepAdd(tsSteps, "dnode-vnode", dnodeInitVnodeModule, vnodeCleanup);
- taosStepAdd(tsSteps, "dnode-mnode", dnodeInitMnodeModule, mnodeCleanup);
- taosStepAdd(tsSteps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
- taosStepAdd(tsSteps, "dnode-status", dnodeInitStatus, dnodeCleanupStatus);
- //taosStepAdd(tsSteps, "dnode-script",scriptEnvPoolInit, scriptEnvPoolCleanup);
+ taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
+ taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir);
+ taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
+ taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
+ taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL);
+ taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
+ taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
+ taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps);
+ taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup);
+ taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup);
+ taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
+ taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg);
- taosStepExec(tsSteps);
+ tsDnode.steps = steps;
+ taosStepExec(tsDnode.steps);
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
dnodeReportStartupFinished("TDengine", "initialized successfully");
@@ -86,7 +195,7 @@ int32_t dnodeInit() {
void dnodeCleanup() {
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
dnodeSetRunStat(DN_RUN_STAT_STOPPED);
- taosStepCleanup(tsSteps);
- tsSteps = NULL;
+ taosStepCleanup(tsDnode.steps);
+ tsDnode.steps = NULL;
}
}
diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c
deleted file mode 100644
index 74309dda69..0000000000
--- a/source/server/dnode/src/dnodeMain.c
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "tcache.h"
-#include "tconfig.h"
-#include "tglobal.h"
-#if 0
-#include "tfs.h"
-#endif
-#include "tnote.h"
-#include "tcompression.h"
-#include "ttimer.h"
-#include "dnodeCfg.h"
-#include "dnodeMain.h"
-#include "mnode.h"
-
-static struct {
- EDnStat runStatus;
- void * dnodeTimer;
- SStartupStep startup;
-} tsDmain;
-
-static void dnodeCheckDataDirOpenned(char *dir) {
-#if 0
- char filepath[256] = {0};
- snprintf(filepath, sizeof(filepath), "%s/.running", dir);
-
- int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
- if (fd < 0) {
- dError("failed to open lock file:%s, reason: %s, quit", filepath, strerror(errno));
- exit(0);
- }
-
- int32_t ret = flock(fd, LOCK_EX | LOCK_NB);
- if (ret != 0) {
- dError("failed to lock file:%s ret:%d since %s, database may be running, quit", filepath, ret, strerror(errno));
- close(fd);
- exit(0);
- }
-#endif
-}
-
-int32_t dnodeInitMain() {
- tsDmain.runStatus = DN_RUN_STAT_STOPPED;
- tsDmain.dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR");
- if (tsDmain.dnodeTimer == NULL) {
- dError("failed to init dnode timer");
- return -1;
- }
-
- tscEmbedded = 1;
- taosIgnSIGPIPE();
- taosBlockSIGPIPE();
- taosResolveCRC();
- taosInitGlobalCfg();
- taosReadGlobalLogCfg();
- taosSetCoreDump(tsEnableCoreFile);
-
- if (!taosMkDir(tsLogDir)) {
- printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
- return -1;
- }
-
- char temp[TSDB_FILENAME_LEN];
- sprintf(temp, "%s/taosdlog", tsLogDir);
- if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
- printf("failed to init log file\n");
- }
-
- if (!taosReadGlobalCfg()) {
- taosPrintGlobalCfg();
- dError("TDengine read global config failed");
- return -1;
- }
-
- dInfo("start to initialize TDengine");
-
- taosInitNotes();
-
- return taosCheckGlobalCfg();
-}
-
-void dnodeCleanupMain() {
- if (tsDmain.dnodeTimer != NULL) {
- taosTmrCleanUp(tsDmain.dnodeTimer);
- tsDmain.dnodeTimer = NULL;
- }
-
-#if 0
- taos_cleanup();
-#endif
- taosCloseLog();
- taosStopCacheRefreshWorker();
-}
-
-int32_t dnodeInitStorage() {
-#ifdef TD_TSZ
- // compress module init
- tsCompressInit();
-#endif
-
- // storage module init
- if (tsDiskCfgNum == 1 && !taosMkDir(tsDataDir)) {
- dError("failed to create dir:%s since %s", tsDataDir, strerror(errno));
- return -1;
- }
-
-#if 0
- if (tfsInit(tsDiskCfg, tsDiskCfgNum) < 0) {
- dError("failed to init TFS since %s", tstrerror(terrno));
- return -1;
- }
-
- strncpy(tsDataDir, TFS_PRIMARY_PATH(), TSDB_FILENAME_LEN);
-#endif
- sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
- sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
- sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
-
- if (!taosMkDir(tsMnodeDir)) {
- dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
- return -1;
- }
-
- if (!taosMkDir(tsDnodeDir)) {
- dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
- return -1;
- }
-
-#if 0
- if (tfsMkdir("vnode") < 0) {
- dError("failed to create vnode dir since %s", tstrerror(terrno));
- return -1;
- }
-
- if (tfsMkdir("vnode_bak") < 0) {
- dError("failed to create vnode_bak dir since %s", tstrerror(terrno));
- return -1;
- }
-
- TDIR *tdir = tfsOpendir("vnode_bak/.staging");
- bool stagingNotEmpty = tfsReaddir(tdir) != NULL;
- tfsClosedir(tdir);
-
- if (stagingNotEmpty) {
- dError("vnode_bak/.staging dir not empty, fix it first.");
- return -1;
- }
-
- if (tfsMkdir("vnode_bak/.staging") < 0) {
- dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno));
- return -1;
- }
-
- dnodeCheckDataDirOpenned(tsDnodeDir);
-
- taosGetDisk();
- dnodePrintDiskInfo();
-#endif
-
- dInfo("dnode storage is initialized at %s", tsDnodeDir);
- return 0;
-}
-
-void dnodeCleanupStorage() {
-#if 0
- // storage destroy
- tfsDestroy();
-
- #ifdef TD_TSZ
- // compress destroy
- tsCompressExit();
- #endif
-#endif
-}
-
-void dnodeReportStartup(char *name, char *desc) {
- SStartupStep *startup = &tsDmain.startup;
- tstrncpy(startup->name, name, strlen(startup->name));
- tstrncpy(startup->desc, desc, strlen(startup->desc));
- startup->finished = 0;
-}
-
-void dnodeReportStartupFinished(char *name, char *desc) {
- SStartupStep *startup = &tsDmain.startup;
- tstrncpy(startup->name, name, strlen(startup->name));
- tstrncpy(startup->desc, desc, strlen(startup->desc));
- startup->finished = 1;
-}
-
-void dnodeProcessStartupReq(SRpcMsg *pMsg) {
- dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
-
- SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
- memcpy(pStep, &tsDmain.startup, sizeof(SStartupStep));
-
- dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
-
- SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
- rpcSendResponse(&rpcRsp);
- rpcFreeCont(pMsg->pCont);
-}
-
-static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
- SCreateMnodeMsg *pCfg = pMsg->pCont;
- pCfg->dnodeId = htonl(pCfg->dnodeId);
- if (pCfg->dnodeId != dnodeGetDnodeId()) {
- dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId,
- dnodeGetDnodeId());
- return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
- }
-
- if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
- dDebug("dnodeEp:%s, in create meps msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
- return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
- }
-
- dDebug("dnode:%d, create meps msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum);
- for (int32_t i = 0; i < pCfg->mnodes.mnodeNum; ++i) {
- pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId);
- dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
- }
-
- if (mnodeGetStatus() == MN_STATUS_READY) return 0;
-
- return mnodeDeploy();
-}
-
-void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
- int32_t code = dnodeStartMnode(pMsg);
-
- SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
-
- rpcSendResponse(&rspMsg);
- rpcFreeCont(pMsg->pCont);
-}
-
-void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
- SCfgDnodeMsg *pCfg = pMsg->pCont;
-
- int32_t code = taosCfgDynamicOptions(pCfg->config);
-
- SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
-
- rpcSendResponse(&rspMsg);
- rpcFreeCont(pMsg->pCont);
-}
-
-EDnStat dnodeGetRunStat() { return tsDmain.runStatus; }
-
-void dnodeSetRunStat(EDnStat stat) { tsDmain.runStatus = stat; }
-
-void* dnodeGetTimer() { return tsDmain.dnodeTimer; }
\ No newline at end of file
diff --git a/source/server/dnode/src/dnodeMnodeEps.c b/source/server/dnode/src/dnodeMnodeEps.c
deleted file mode 100644
index f34ee48238..0000000000
--- a/source/server/dnode/src/dnodeMnodeEps.c
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "cJSON.h"
-#include "tglobal.h"
-#include "dnodeCfg.h"
-#include "dnodeDnodeEps.h"
-#include "dnodeMnodeEps.h"
-#include "mnode.h"
-
-static struct {
- SRpcEpSet mnodeEpSet;
- SMInfos mnodeInfos;
- char file[PATH_MAX + 20];
- pthread_mutex_t mutex;
-} tsDmeps;
-
-
-static void dnodePrintMnodeEps() {
- SRpcEpSet *epset = &tsDmeps.mnodeEpSet;
- dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse);
- for (int32_t i = 0; i < epset->numOfEps; i++) {
- dInfo("ep index:%d, %s:%u", i, epset->fqdn[i], epset->port[i]);
- }
-}
-
-static void dnodeResetMnodeEps(SMInfos *mInfos) {
- if (mInfos == NULL || mInfos->mnodeNum == 0) {
- tsDmeps.mnodeEpSet.numOfEps = 1;
- taosGetFqdnPortFromEp(tsFirst, tsDmeps.mnodeEpSet.fqdn[0], &tsDmeps.mnodeEpSet.port[0]);
-
- if (strcmp(tsSecond, tsFirst) != 0) {
- tsDmeps.mnodeEpSet.numOfEps = 2;
- taosGetFqdnPortFromEp(tsSecond, tsDmeps.mnodeEpSet.fqdn[1], &tsDmeps.mnodeEpSet.port[1]);
- }
- dnodePrintMnodeEps();
- return;
- }
-
- int32_t size = sizeof(SMInfos);
- memcpy(&tsDmeps.mnodeInfos, mInfos, size);
-
- tsDmeps.mnodeEpSet.inUse = tsDmeps.mnodeInfos.inUse;
- tsDmeps.mnodeEpSet.numOfEps = tsDmeps.mnodeInfos.mnodeNum;
- for (int32_t i = 0; i < tsDmeps.mnodeInfos.mnodeNum; i++) {
- taosGetFqdnPortFromEp(tsDmeps.mnodeInfos.mnodeInfos[i].mnodeEp, tsDmeps.mnodeEpSet.fqdn[i], &tsDmeps.mnodeEpSet.port[i]);
- }
-
- dnodePrintMnodeEps();
-}
-
-static int32_t dnodeWriteMnodeEps() {
- FILE *fp = fopen(tsDmeps.file, "w");
- if (!fp) {
- dError("failed to write %s since %s", tsDmeps.file, strerror(errno));
- return -1;
- }
-
- int32_t len = 0;
- int32_t maxLen = 2000;
- char * content = calloc(1, maxLen + 1);
-
- len += snprintf(content + len, maxLen - len, "{\n");
- len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDmeps.mnodeInfos.inUse);
- len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDmeps.mnodeInfos.mnodeNum);
- len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
- for (int32_t i = 0; i < tsDmeps.mnodeInfos.mnodeNum; i++) {
- len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDmeps.mnodeInfos.mnodeInfos[i].mnodeId);
- len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDmeps.mnodeInfos.mnodeInfos[i].mnodeEp);
- if (i < tsDmeps.mnodeInfos.mnodeNum - 1) {
- len += snprintf(content + len, maxLen - len, " },{\n");
- } else {
- len += snprintf(content + len, maxLen - len, " }]\n");
- }
- }
- len += snprintf(content + len, maxLen - len, "}\n");
-
- fwrite(content, 1, len, fp);
- taosFsyncFile(fileno(fp));
- fclose(fp);
- free(content);
- terrno = 0;
-
- dInfo("successed to write %s", tsDmeps.file);
- return 0;
-}
-
-static int32_t dnodeReadMnodeEps() {
- int32_t len = 0;
- int32_t maxLen = 2000;
- char * content = calloc(1, maxLen + 1);
- cJSON * root = NULL;
- FILE * fp = NULL;
- SMInfos mInfos = {0};
- bool nodeChanged = false;
-
- fp = fopen(tsDmeps.file, "r");
- if (!fp) {
- dDebug("file %s not exist", tsDmeps.file);
- goto PARSE_MINFOS_OVER;
- }
-
- len = (int32_t)fread(content, 1, maxLen, fp);
- if (len <= 0) {
- dError("failed to read %s since content is null", tsDmeps.file);
- goto PARSE_MINFOS_OVER;
- }
-
- content[len] = 0;
- root = cJSON_Parse(content);
- if (root == NULL) {
- dError("failed to read %s since invalid json format", tsDmeps.file);
- goto PARSE_MINFOS_OVER;
- }
-
- cJSON *inUse = cJSON_GetObjectItem(root, "inUse");
- if (!inUse || inUse->type != cJSON_Number) {
- dError("failed to read mnodeEpSet.json since inUse not found");
- goto PARSE_MINFOS_OVER;
- }
- tsDmeps.mnodeInfos.inUse = (int8_t)inUse->valueint;
-
- cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum");
- if (!nodeNum || nodeNum->type != cJSON_Number) {
- dError("failed to read mnodeEpSet.json since nodeNum not found");
- goto PARSE_MINFOS_OVER;
- }
- mInfos.mnodeNum = (int8_t)nodeNum->valueint;
-
- cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
- if (!nodeInfos || nodeInfos->type != cJSON_Array) {
- dError("failed to read mnodeEpSet.json since nodeInfos not found");
- goto PARSE_MINFOS_OVER;
- }
-
- int32_t size = cJSON_GetArraySize(nodeInfos);
- if (size != mInfos.mnodeNum) {
- dError("failed to read mnodeEpSet.json since nodeInfos size not matched");
- goto PARSE_MINFOS_OVER;
- }
-
- for (int32_t i = 0; i < size; ++i) {
- cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i);
- if (nodeInfo == NULL) continue;
-
- cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
- if (!nodeId || nodeId->type != cJSON_Number) {
- dError("failed to read mnodeEpSet.json since nodeId not found");
- goto PARSE_MINFOS_OVER;
- }
-
- cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp");
- if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) {
- dError("failed to read mnodeEpSet.json since nodeName not found");
- goto PARSE_MINFOS_OVER;
- }
-
- SMInfo *mInfo = &mInfos.mnodeInfos[i];
- mInfo->mnodeId = (int32_t)nodeId->valueint;
- tstrncpy(mInfo->mnodeEp, nodeEp->valuestring, TSDB_EP_LEN);
-
- bool changed = dnodeIsDnodeEpChanged(mInfo->mnodeId, mInfo->mnodeEp);
- if (changed) nodeChanged = changed;
- }
-
- dInfo("successed to read file %s", tsDmeps.file);
-
-PARSE_MINFOS_OVER:
- if (content != NULL) free(content);
- if (root != NULL) cJSON_Delete(root);
- if (fp != NULL) fclose(fp);
- terrno = 0;
-
- for (int32_t i = 0; i < mInfos.mnodeNum; ++i) {
- SMInfo *mInfo = &mInfos.mnodeInfos[i];
- dnodeGetDnodeEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL);
- }
-
- dnodeResetMnodeEps(&mInfos);
-
- if (nodeChanged) {
- dnodeWriteMnodeEps();
- }
-
- return 0;
-}
-
-void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
- SRpcConnInfo connInfo = {0};
- rpcGetConnInfo(rpcMsg->handle, &connInfo);
-
- SRpcEpSet epSet = {0};
- if (forShell) {
- dnodeGetEpSetForShell(&epSet);
- } else {
- dnodeGetEpSetForPeer(&epSet);
- }
-
- dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType],
- taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse);
-
- for (int32_t i = 0; i < epSet.numOfEps; ++i) {
- dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]);
- if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) {
- if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) ||
- (epSet.port[i] == tsServerPort && forShell)) {
- epSet.inUse = (i + 1) % epSet.numOfEps;
- dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse);
- }
- }
-
- epSet.port[i] = htons(epSet.port[i]);
- }
-
- rpcSendRedirectRsp(rpcMsg->handle, &epSet);
-}
-
-int32_t dnodeInitMnodeEps() {
- snprintf(tsDmeps.file, sizeof(tsDmeps.file), "%s/mnodeEpSet.json", tsDnodeDir);
- pthread_mutex_init(&tsDmeps.mutex, NULL);
-
- dnodeResetMnodeEps(NULL);
- int32_t ret = dnodeReadMnodeEps();
- if (ret == 0) {
- dInfo("dnode mInfos is initialized");
- }
-
- return ret;
-}
-
-void dnodeCleanupMnodeEps() {
- pthread_mutex_destroy(&tsDmeps.mutex);
-}
-
-void dnodeUpdateMnodeFromStatus(SMInfos *mInfos) {
- if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) {
- dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum);
- return;
- }
-
- for (int32_t i = 0; i < mInfos->mnodeNum; ++i) {
- SMInfo *minfo = &mInfos->mnodeInfos[i];
- minfo->mnodeId = htonl(minfo->mnodeId);
- if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) {
- dError("invalid mInfo:%d since id:%d and ep:%s invalid", i, minfo->mnodeId, minfo->mnodeEp);
- return;
- }
- }
-
- pthread_mutex_lock(&tsDmeps.mutex);
- if (mInfos->mnodeNum != tsDmeps.mnodeInfos.mnodeNum) {
- dnodeResetMnodeEps(mInfos);
- dnodeWriteMnodeEps();
- } else {
- int32_t size = sizeof(SMInfos);
- if (memcmp(mInfos, &tsDmeps.mnodeInfos, size) != 0) {
- dnodeResetMnodeEps(mInfos);
- dnodeWriteMnodeEps();
- }
- }
- pthread_mutex_unlock(&tsDmeps.mutex);
-}
-
-void dnodeUpdateMnodeFromPeer(SRpcEpSet *ep) {
- if (ep->numOfEps <= 0) {
- dError("mInfos is changed, but content is invalid, discard it");
- return;
- }
-
- pthread_mutex_lock(&tsDmeps.mutex);
-
- dInfo("mInfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse);
- for (int32_t i = 0; i < ep->numOfEps; ++i) {
- ep->port[i] -= TSDB_PORT_DNODEDNODE;
- dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]);
- }
- tsDmeps.mnodeEpSet = *ep;
-
- pthread_mutex_unlock(&tsDmeps.mutex);
-}
-
-void dnodeGetEpSetForPeer(SRpcEpSet *epSet) {
- pthread_mutex_lock(&tsDmeps.mutex);
-
- *epSet = tsDmeps.mnodeEpSet;
- for (int32_t i = 0; i < epSet->numOfEps; ++i) {
- epSet->port[i] += TSDB_PORT_DNODEDNODE;
- }
-
- pthread_mutex_unlock(&tsDmeps.mutex);
-}
-
-void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
- pthread_mutex_lock(&tsDmeps.mutex);
- *epSet = tsDmeps.mnodeEpSet;
- pthread_mutex_unlock(&tsDmeps.mutex);
-}
diff --git a/source/server/dnode/src/dnodeMsg.c b/source/server/dnode/src/dnodeMsg.c
new file mode 100644
index 0000000000..efdbfa14ca
--- /dev/null
+++ b/source/server/dnode/src/dnodeMsg.c
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "tthread.h"
+#include "dnodeEps.h"
+#include "dnodeMsg.h"
+#include "mnode.h"
+#include "vnode.h"
+#include "ttime.h"
+
+static struct {
+ pthread_t *threadId;
+ bool stop;
+ uint32_t rebootTime;
+} tsMsg;
+
+static void dnodeSendStatusMsg() {
+ int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
+ SStatusMsg *pStatus = rpcMallocCont(contLen);
+ if (pStatus == NULL) {
+ dError("failed to malloc status message");
+ return;
+ }
+
+ pStatus->version = htonl(tsVersion);
+ pStatus->dnodeId = htonl(dnodeGetDnodeId());
+ tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
+ pStatus->clusterId = htobe64(dnodeGetClusterId());
+ pStatus->lastReboot = htonl(tsMsg.rebootTime);
+ pStatus->numOfCores = htonl(tsNumOfCores);
+ pStatus->diskAvailable = tsAvailDataDirGB;
+
+ // fill cluster cfg parameters
+ pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
+ pStatus->clusterCfg.checkTime = 0;
+ tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
+ char timestr[32] = "1970-01-01 00:00:00.00";
+ (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
+ tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
+ tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
+
+ vnodeGetStatus(pStatus);
+ contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
+ pStatus->openVnodes = htons(pStatus->openVnodes);
+
+ SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS};
+
+ dnodeSendMsgToMnode(&rpcMsg);
+}
+
+void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
+ dTrace("status rsp is received, code:%s", tstrerror(pMsg->code));
+ if (pMsg->code != TSDB_CODE_SUCCESS) return;
+
+ SStatusRsp *pStatusRsp = pMsg->pCont;
+
+ SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
+ pCfg->dnodeId = htonl(pCfg->dnodeId);
+ pCfg->clusterId = htobe64(pCfg->clusterId);
+ pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
+ pCfg->numOfDnodes = htonl(pCfg->numOfDnodes);
+ dnodeUpdateCfg(pCfg);
+
+ if (pCfg->dropped) {
+ dError("status rsp is received, and set dnode to drop status");
+ return;
+ }
+
+ vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
+
+ SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
+ eps->dnodeNum = htonl(eps->dnodeNum);
+ for (int32_t i = 0; i < eps->dnodeNum; ++i) {
+ eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
+ eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
+ }
+
+ dnodeUpdateDnodeEps(eps);
+}
+
+static void *dnodeThreadRoutine(void *param) {
+ int32_t ms = tsStatusInterval * 1000;
+ while (!tsMsg.stop) {
+ taosMsleep(ms);
+ dnodeSendStatusMsg();
+ }
+}
+
+int32_t dnodeInitMsg() {
+ tsMsg.stop = false;
+ tsMsg.rebootTime = taosGetTimestampSec();
+ tsMsg.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
+ if (tsMsg.threadId == NULL) {
+ return -1;
+ }
+
+ dInfo("dnode msg is initialized");
+ return 0;
+}
+
+void dnodeCleanupMsg() {
+ if (tsMsg.threadId != NULL) {
+ tsMsg.stop = true;
+ taosDestoryThread(tsMsg.threadId);
+ tsMsg.threadId = NULL;
+ }
+
+ dInfo("dnode msg is cleanuped");
+}
+
+static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
+ SCreateMnodeMsg *pCfg = pMsg->pCont;
+ pCfg->dnodeId = htonl(pCfg->dnodeId);
+ pCfg->mnodeNum = htonl(pCfg->mnodeNum);
+ for (int32_t i = 0; i < pCfg->mnodeNum; ++i) {
+ pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId);
+ pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort);
+ }
+
+ if (pCfg->dnodeId != dnodeGetDnodeId()) {
+ dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
+ return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
+ }
+
+ if (mnodeGetStatus() == MN_STATUS_READY) return 0;
+
+ return mnodeDeploy();
+}
+
+void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
+ int32_t code = dnodeStartMnode(pMsg);
+
+ SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
+
+ rpcSendResponse(&rspMsg);
+ rpcFreeCont(pMsg->pCont);
+}
+
+void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
+ SCfgDnodeMsg *pCfg = pMsg->pCont;
+
+ int32_t code = taosCfgDynamicOptions(pCfg->config);
+
+ SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
+
+ rpcSendResponse(&rspMsg);
+ rpcFreeCont(pMsg->pCont);
+}
+
+void dnodeProcessStartupReq(SRpcMsg *pMsg) {
+ dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
+
+ SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
+ dnodeGetStartup(pStep);
+
+ dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
+
+ SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
+ rpcSendResponse(&rpcRsp);
+ rpcFreeCont(pMsg->pCont);
+}
diff --git a/source/server/dnode/src/dnodeStatus.c b/source/server/dnode/src/dnodeStatus.c
deleted file mode 100644
index 4edcc2e8c2..0000000000
--- a/source/server/dnode/src/dnodeStatus.c
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "os.h"
-#include "ttime.h"
-#include "ttimer.h"
-#include "tglobal.h"
-#include "dnodeCfg.h"
-#include "dnodeDnodeEps.h"
-#include "dnodeMnodeEps.h"
-#include "dnodeStatus.h"
-#include "dnodeMain.h"
-#include "vnode.h"
-
-static struct {
- void * dnodeTimer;
- void * statusTimer;
- uint32_t rebootTime;
-} tsStatus;
-
-static void dnodeSendStatusMsg(void *handle, void *tmrId) {
- if (tsStatus.statusTimer == NULL) {
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer);
- dError("failed to start status timer");
- return;
- }
-
- int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
- SStatusMsg *pStatus = rpcMallocCont(contLen);
- if (pStatus == NULL) {
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer);
- dError("failed to malloc status message");
- return;
- }
-
- dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId);
- pStatus->dnodeId = htonl(dnodeGetDnodeId());
- pStatus->version = htonl(tsVersion);
- pStatus->lastReboot = htonl(tsStatus.rebootTime);
- pStatus->numOfCores = htons((uint16_t)tsNumOfCores);
- pStatus->diskAvailable = tsAvailDataDirGB;
- pStatus->alternativeRole = tsAlternativeRole;
- tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
-
- // fill cluster cfg parameters
- pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes);
- pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum);
- pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold);
- pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
- pStatus->clusterCfg.maxtablesPerVnode = htonl(tsMaxTablePerVnode);
- pStatus->clusterCfg.maxVgroupsPerDb = htonl(tsMaxVgroupsPerDb);
- tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN);
- tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
- pStatus->clusterCfg.checkTime = 0;
- char timestr[32] = "1970-01-01 00:00:00.00";
- (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
- tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
- tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
-
- pStatus->clusterCfg.enableBalance = tsEnableBalance;
- pStatus->clusterCfg.flowCtrl = tsEnableFlowCtrl;
- pStatus->clusterCfg.slaveQuery = tsEnableSlaveQuery;
- pStatus->clusterCfg.adjustMaster = tsEnableAdjustMaster;
-
- vnodeGetStatus(pStatus);
- contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
- pStatus->openVnodes = htons(pStatus->openVnodes);
-
- SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS};
-
- dnodeSendMsgToMnode(&rpcMsg);
-}
-
-void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
- if (pMsg->code != TSDB_CODE_SUCCESS) {
- dError("status rsp is received, error:%s", tstrerror(pMsg->code));
- if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) {
- char clusterId[TSDB_CLUSTER_ID_LEN];
- dnodeGetClusterId(clusterId);
- if (clusterId[0] != '\0') {
- dnodeSetDropped();
- dError("exit zombie dropped dnode");
- exit(EXIT_FAILURE);
- }
- }
-
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer);
- return;
- }
-
- SStatusRsp *pStatusRsp = pMsg->pCont;
- SMInfos * minfos = &pStatusRsp->mnodes;
- dnodeUpdateMnodeFromStatus(minfos);
-
- SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
- pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
- pCfg->moduleStatus = htonl(pCfg->moduleStatus);
- pCfg->dnodeId = htonl(pCfg->dnodeId);
- dnodeUpdateCfg(pCfg);
-
- vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
-
- SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
- dnodeUpdateDnodeEps(pEps);
-
- taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer);
-}
-
-int32_t dnodeInitStatus() {
- tsStatus.statusTimer = NULL;
- tsStatus.dnodeTimer = dnodeGetTimer();
- tsStatus.rebootTime = taosGetTimestampSec();
- taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer);
- dInfo("dnode status timer is initialized");
- return TSDB_CODE_SUCCESS;
-}
-
-void dnodeCleanupStatus() {
- if (tsStatus.statusTimer != NULL) {
- taosTmrStopA(&tsStatus.statusTimer);
- tsStatus.statusTimer = NULL;
- }
-}
diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c
index 7b48bea622..20601f23e7 100644
--- a/source/server/dnode/src/dnodeTrans.c
+++ b/source/server/dnode/src/dnodeTrans.c
@@ -21,9 +21,8 @@
#define _DEFAULT_SOURCE
#include "dnodeTrans.h"
-#include "dnodeMain.h"
-#include "dnodeMnodeEps.h"
-#include "dnodeStatus.h"
+#include "dnodeEps.h"
+#include "dnodeMsg.h"
#include "mnode.h"
#include "vnode.h"
@@ -97,8 +96,8 @@ static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
- tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
- /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/
+ tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
+ tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
@@ -139,7 +138,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
}
if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
- dnodeUpdateMnodeFromPeer(pEpSet);
+ dnodeUpdateMnodeEps(pEpSet);
}
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];