From 73b7bef1e195c28cd8209879bcb2d1d5d40efd8c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 25 Oct 2021 17:39:21 +0800 Subject: [PATCH] [TD-10430] refact dnode module --- include/common/taosmsg.h | 69 +-- include/server/dnode/dnode.h | 2 +- include/server/mnode/mnode.h | 2 +- source/client/CMakeLists.txt | 2 +- source/client/src/client.c | 2 + source/libs/sync/src/sync.c | 5 +- source/libs/wal/src/wal.c | 7 +- source/server/dnode/CMakeLists.txt | 3 + source/server/dnode/inc/dnodeDnodeEps.h | 34 -- .../dnode/inc/{dnodeCfg.h => dnodeEps.h} | 21 +- source/server/dnode/inc/dnodeInt.h | 7 + source/server/dnode/inc/dnodeMain.h | 47 -- source/server/dnode/inc/dnodeMnodeEps.h | 36 -- .../dnode/inc/{dnodeStatus.h => dnodeMsg.h} | 7 +- source/server/dnode/src/dnodeCfg.c | 178 -------- source/server/dnode/src/dnodeDnodeEps.c | 284 ------------ source/server/dnode/src/dnodeEps.c | 415 ++++++++++++++++++ source/server/dnode/src/dnodeInt.c | 187 ++++++-- source/server/dnode/src/dnodeMain.c | 267 ----------- source/server/dnode/src/dnodeMnodeEps.c | 311 ------------- source/server/dnode/src/dnodeMsg.c | 174 ++++++++ source/server/dnode/src/dnodeStatus.c | 136 ------ source/server/dnode/src/dnodeTrans.c | 11 +- 23 files changed, 808 insertions(+), 1399 deletions(-) delete mode 100644 source/server/dnode/inc/dnodeDnodeEps.h rename source/server/dnode/inc/{dnodeCfg.h => dnodeEps.h} (62%) delete mode 100644 source/server/dnode/inc/dnodeMain.h delete mode 100644 source/server/dnode/inc/dnodeMnodeEps.h rename source/server/dnode/inc/{dnodeStatus.h => dnodeMsg.h} (81%) delete mode 100644 source/server/dnode/src/dnodeCfg.c delete mode 100644 source/server/dnode/src/dnodeDnodeEps.c create mode 100644 source/server/dnode/src/dnodeEps.c delete mode 100644 source/server/dnode/src/dnodeMain.c delete mode 100644 source/server/dnode/src/dnodeMnodeEps.c create mode 100644 source/server/dnode/src/dnodeMsg.c delete mode 100644 source/server/dnode/src/dnodeStatus.c diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 037199d3bd..561a81167f 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -657,15 +657,18 @@ typedef struct SVgroupAccess { } SVgroupAccess; typedef struct { - int32_t dnodeId; - uint32_t moduleStatus; - uint32_t numOfVnodes; - char clusterId[TSDB_CLUSTER_ID_LEN]; - char reserved[16]; + int32_t dnodeId; + int8_t dropped; + char reserved[19]; + int64_t clusterId; + int32_t numOfDnodes; + int32_t numOfVnodes; } SDnodeCfg; typedef struct { int32_t dnodeId; + int8_t isMnode; + int8_t reserved; uint16_t dnodePort; char dnodeFqdn[TSDB_FQDN_LEN]; } SDnodeEp; @@ -676,55 +679,29 @@ typedef struct { } SDnodeEps; typedef struct { - int32_t mnodeId; - char mnodeEp[TSDB_EP_LEN]; -} SMInfo; - -typedef struct SMInfos { - int8_t inUse; - int8_t mnodeNum; - SMInfo mnodeInfos[TSDB_MAX_REPLICA]; -} SMInfos; - -typedef struct { - int32_t numOfMnodes; // tsNumOfMnodes - int32_t mnodeEqualVnodeNum; // tsMnodeEqualVnodeNum - int32_t offlineThreshold; // tsOfflineThreshold - int32_t statusInterval; // tsStatusInterval - int32_t maxtablesPerVnode; - int32_t maxVgroupsPerDb; - char arbitrator[TSDB_EP_LEN]; // tsArbitrator - char reserve[2]; // to solve arm32 bus error - char timezone[64]; // tsTimezone - int64_t checkTime; // 1970-01-01 00:00:00.000 - char locale[TSDB_LOCALE_LEN]; // tsLocale - char charset[TSDB_LOCALE_LEN]; // tsCharset - int8_t enableBalance; // tsEnableBalance - int8_t flowCtrl; - int8_t slaveQuery; - int8_t adjustMaster; - int8_t reserved[4]; + int32_t statusInterval; // tsStatusInterval + int8_t reserved[36]; + int64_t checkTime; // 1970-01-01 00:00:00.000 + char timezone[64]; // tsTimezone + char locale[TSDB_LOCALE_LEN]; // tsLocale + char charset[TSDB_LOCALE_LEN]; // tsCharset } SClusterCfg; typedef struct SStatusMsg { uint32_t version; int32_t dnodeId; + uint32_t lastReboot; // time stamp for last reboot + int32_t openVnodes; + int32_t numOfCores; + float diskAvailable; + int8_t reserved[36]; char dnodeEp[TSDB_EP_LEN]; - uint32_t moduleStatus; - uint32_t lastReboot; // time stamp for last reboot - uint16_t reserve1; // from config file - uint16_t openVnodes; - uint16_t numOfCores; - float diskAvailable; // GB - char clusterId[TSDB_CLUSTER_ID_LEN]; - uint8_t alternativeRole; - uint8_t reserve2[15]; + int64_t clusterId; SClusterCfg clusterCfg; SVnodeLoad load[]; } SStatusMsg; typedef struct { - SMInfos mnodes; SDnodeCfg dnodeCfg; SVgroupAccess vgAccess[]; } SStatusRsp; @@ -860,9 +837,9 @@ typedef struct { } SCreateDnodeMsg, SDropDnodeMsg; typedef struct { - int32_t dnodeId; - char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port - SMInfos mnodes; + int32_t dnodeId; + int32_t mnodeNum; + SDnodeEp mnodeEps[]; } SCreateMnodeMsg; typedef struct { diff --git a/include/server/dnode/dnode.h b/include/server/dnode/dnode.h index d7aaa0e008..3499913afa 100644 --- a/include/server/dnode/dnode.h +++ b/include/server/dnode/dnode.h @@ -65,7 +65,7 @@ void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); * @param fqdn, the fqdn of dnode. * @param port, the port of dnode. */ -void dnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); +void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); #ifdef __cplusplus } diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index f15fc1792a..9bbc2e4b10 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -60,7 +60,7 @@ typedef struct { typedef struct { SMnodeFp fp; - char clusterId[TSDB_CLUSTER_ID_LEN]; + int64_t clusterId; int32_t dnodeId; } SMnodePara; diff --git a/source/client/CMakeLists.txt b/source/client/CMakeLists.txt index 2313c1633c..bc0d439407 100644 --- a/source/client/CMakeLists.txt +++ b/source/client/CMakeLists.txt @@ -7,4 +7,4 @@ target_include_directories( target_link_libraries( taos INTERFACE api -) \ No newline at end of file +) diff --git a/source/client/src/client.c b/source/client/src/client.c index 77c4aa1b2d..b1663239e6 100644 --- a/source/client/src/client.c +++ b/source/client/src/client.c @@ -19,3 +19,5 @@ // //} +int taos_init() { return 0; } +void taos_cleanup(void) {} diff --git a/source/libs/sync/src/sync.c b/source/libs/sync/src/sync.c index 35611f3da8..4b3ca11e4b 100644 --- a/source/libs/sync/src/sync.c +++ b/source/libs/sync/src/sync.c @@ -13,4 +13,7 @@ * along with this program. If not, see . */ -#include "sync.h" \ No newline at end of file +#include "sync.h" + +int32_t syncInit() {return 0;} +void syncCleanUp() {} \ No newline at end of file diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index 6dea4a4e57..1e74c1613c 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -11,4 +11,9 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "wal.h" + +int32_t walInit() {return 0;} +void walCleanUp() {} \ No newline at end of file diff --git a/source/server/dnode/CMakeLists.txt b/source/server/dnode/CMakeLists.txt index e627ca94e8..e1462ab3c8 100644 --- a/source/server/dnode/CMakeLists.txt +++ b/source/server/dnode/CMakeLists.txt @@ -5,6 +5,9 @@ target_link_libraries( PUBLIC cjson PUBLIC mnode PUBLIC vnode + PUBLIC wal + PUBLIC sync + PUBLIC taos ) target_include_directories( dnode diff --git a/source/server/dnode/inc/dnodeDnodeEps.h b/source/server/dnode/inc/dnodeDnodeEps.h deleted file mode 100644 index 01023da7fc..0000000000 --- a/source/server/dnode/inc/dnodeDnodeEps.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DNODE_DNODE_EPS_H_ -#define _TD_DNODE_DNODE_EPS_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "dnodeInt.h" - -int32_t dnodeInitDnodeEps(); -void dnodeCleanupDnodeEps(); -void dnodeUpdateDnodeEps(SDnodeEps *data); -bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr); -void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DNODE_DNODE_EPS_H_*/ \ No newline at end of file diff --git a/source/server/dnode/inc/dnodeCfg.h b/source/server/dnode/inc/dnodeEps.h similarity index 62% rename from source/server/dnode/inc/dnodeCfg.h rename to source/server/dnode/inc/dnodeEps.h index eda6231579..ac68d16374 100644 --- a/source/server/dnode/inc/dnodeCfg.h +++ b/source/server/dnode/inc/dnodeEps.h @@ -13,25 +13,30 @@ * along with this program. If not, see . */ -#ifndef _TD_DNODE_CFG_H_ -#define _TD_DNODE_CFG_H_ +#ifndef _TD_DNODE_EPS_H_ +#define _TD_DNODE_EPS_H_ #ifdef __cplusplus extern "C" { #endif #include "dnodeInt.h" +int32_t dnodeInitEps(); +void dnodeCleanupEps(); -int32_t dnodeInitCfg(); -void dnodeCleanupCfg(); void dnodeUpdateCfg(SDnodeCfg *data); +void dnodeUpdateDnodeEps(SDnodeEps *data); +void dnodeUpdateMnodeEps(SRpcEpSet *pEpSet); int32_t dnodeGetDnodeId(); -void dnodeGetClusterId(char *clusterId); -void dnodeGetCfg(int32_t *dnodeId, char *clusterId); -void dnodeSetDropped(); +int64_t dnodeGetClusterId(); +void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); + +void dnodeGetEpSetForPeer(SRpcEpSet *epSet); +void dnodeGetEpSetForShell(SRpcEpSet *epSet); +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus } #endif -#endif /*_TD_DNODE_CFG_H_*/ +#endif /*_TD_DNODE_EPS_H_*/ \ No newline at end of file diff --git a/source/server/dnode/inc/dnodeInt.h b/source/server/dnode/inc/dnodeInt.h index 38eac2794e..85f474a391 100644 --- a/source/server/dnode/inc/dnodeInt.h +++ b/source/server/dnode/inc/dnodeInt.h @@ -24,6 +24,7 @@ extern "C" { #include "tglobal.h" #include "tlog.h" #include "trpc.h" +#include "ttimer.h" #include "dnode.h" extern int32_t dDebugFlag; @@ -35,6 +36,12 @@ extern int32_t dDebugFlag; #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} +typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat; + +EDnStat dnodeGetRunStat(); +void dnodeSetRunStat(); +void dnodeGetStartup(SStartupStep *); + #ifdef __cplusplus } #endif diff --git a/source/server/dnode/inc/dnodeMain.h b/source/server/dnode/inc/dnodeMain.h deleted file mode 100644 index b3aeeceb44..0000000000 --- a/source/server/dnode/inc/dnodeMain.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DNODE_MAIN_H_ -#define _TD_DNODE_MAIN_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "dnodeInt.h" - -typedef enum { - DN_RUN_STAT_INIT, - DN_RUN_STAT_RUNNING, - DN_RUN_STAT_STOPPED -} EDnStat; - -int32_t dnodeInitMain(); -void dnodeCleanupMain(); -int32_t dnodeInitStorage(); -void dnodeCleanupStorage(); -void dnodeReportStartup(char *name, char *desc); -void dnodeReportStartupFinished(char *name, char *desc); -void dnodeProcessStartupReq(SRpcMsg *pMsg); -void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); -void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); -EDnStat dnodeGetRunStat(); -void dnodeSetRunStat(); -void* dnodeGetTimer(); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DNODE_MAIN_H_*/ diff --git a/source/server/dnode/inc/dnodeMnodeEps.h b/source/server/dnode/inc/dnodeMnodeEps.h deleted file mode 100644 index c890f6921d..0000000000 --- a/source/server/dnode/inc/dnodeMnodeEps.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DNODE_MNODE_EP_H_ -#define _TD_DNODE_MNODE_EP_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "dnodeInt.h" - -int32_t dnodeInitMnodeEps(); -void dnodeCleanupMnodeEps(); -void dnodeUpdateMnodeFromStatus(SMInfos *pMinfos); -void dnodeUpdateMnodeFromPeer(SRpcEpSet *pEpSet); -void dnodeGetEpSetForPeer(SRpcEpSet *epSet); -void dnodeGetEpSetForShell(SRpcEpSet *epSet); -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DNODE_MNODE_EP_H_*/ diff --git a/source/server/dnode/inc/dnodeStatus.h b/source/server/dnode/inc/dnodeMsg.h similarity index 81% rename from source/server/dnode/inc/dnodeStatus.h rename to source/server/dnode/inc/dnodeMsg.h index f0473b93f1..0790fa7e3e 100644 --- a/source/server/dnode/inc/dnodeStatus.h +++ b/source/server/dnode/inc/dnodeMsg.h @@ -21,9 +21,12 @@ extern "C" { #endif #include "dnodeInt.h" -int32_t dnodeInitStatus(); -void dnodeCleanupStatus(); +int32_t dnodeInitMsg(); +void dnodeCleanupMsg(); void dnodeProcessStatusRsp(SRpcMsg *pMsg); +void dnodeProcessStartupReq(SRpcMsg *pMsg); +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); +void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/server/dnode/src/dnodeCfg.c b/source/server/dnode/src/dnodeCfg.c deleted file mode 100644 index f9ed491464..0000000000 --- a/source/server/dnode/src/dnodeCfg.c +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "cJSON.h" -#include "tglobal.h" -#include "dnodeCfg.h" - -static struct DnCfg { - int32_t dnodeId; - int32_t dropped; - char clusterId[TSDB_CLUSTER_ID_LEN]; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} tsDcfg; - -static int32_t dnodeReadCfg() { - int32_t len = 0; - int32_t maxLen = 200; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; - - fp = fopen(tsDcfg.file, "r"); - if (!fp) { - dDebug("file %s not exist", tsDcfg.file); - goto PARSE_CFG_OVER; - } - - len = (int32_t)fread(content, 1, maxLen, fp); - if (len <= 0) { - dError("failed to read %s since content is null", tsDcfg.file); - goto PARSE_CFG_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", tsDcfg.file); - goto PARSE_CFG_OVER; - } - - cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read %s since dnodeId not found", tsDcfg.file); - goto PARSE_CFG_OVER; - } - tsDcfg.dnodeId = (int32_t)dnodeId->valueint; - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", tsDcfg.file); - goto PARSE_CFG_OVER; - } - tsDcfg.dropped = (int32_t)dropped->valueint; - - cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); - if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", tsDcfg.file); - goto PARSE_CFG_OVER; - } - tstrncpy(tsDcfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN); - - dInfo("successed to read %s", tsDcfg.file); - -PARSE_CFG_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (fp != NULL) fclose(fp); - terrno = 0; - - return 0; -} - -static int32_t dnodeWriteCfg() { - FILE *fp = fopen(tsDcfg.file, "w"); - if (!fp) { - dError("failed to write %s since %s", tsDcfg.file, strerror(errno)); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 200; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsDcfg.dnodeId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", tsDcfg.dropped); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsDcfg.clusterId); - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - taosFsyncFile(fileno(fp)); - fclose(fp); - free(content); - terrno = 0; - - dInfo("successed to write %s", tsDcfg.file); - return 0; -} - -int32_t dnodeInitCfg() { - tsDcfg.dnodeId = 0; - tsDcfg.dropped = 0; - tsDcfg.clusterId[0] = 0; - snprintf(tsDcfg.file, sizeof(tsDcfg.file), "%s/dnodeCfg.json", tsDnodeDir); - pthread_mutex_init(&tsDcfg.mutex, NULL); - - int32_t ret = dnodeReadCfg(); - if (ret == 0) { - dInfo("dnode cfg is initialized"); - } - - if (tsDcfg.dropped) { - dInfo("dnode is dropped and start to exit"); - return -1; - } - - return ret; -} - -void dnodeCleanupCfg() { - pthread_mutex_destroy(&tsDcfg.mutex); -} - -void dnodeUpdateCfg(SDnodeCfg *data) { - if (tsDcfg.dnodeId != 0) return; - - pthread_mutex_lock(&tsDcfg.mutex); - - tsDcfg.dnodeId = data->dnodeId; - tstrncpy(tsDcfg.clusterId, data->clusterId, TSDB_CLUSTER_ID_LEN); - dInfo("dnodeId is set to %d, clusterId is set to %s", data->dnodeId, data->clusterId); - - dnodeWriteCfg(); - pthread_mutex_unlock(&tsDcfg.mutex); -} - -void dnodeSetDropped() { - pthread_mutex_lock(&tsDcfg.mutex); - tsDcfg.dropped = 1; - dnodeWriteCfg(); - pthread_mutex_unlock(&tsDcfg.mutex); -} - -int32_t dnodeGetDnodeId() { - int32_t dnodeId = 0; - pthread_mutex_lock(&tsDcfg.mutex); - dnodeId = tsDcfg.dnodeId; - pthread_mutex_unlock(&tsDcfg.mutex); - return dnodeId; -} - -void dnodeGetClusterId(char *clusterId) { - pthread_mutex_lock(&tsDcfg.mutex); - tstrncpy(clusterId, tsDcfg.clusterId, TSDB_CLUSTER_ID_LEN); - pthread_mutex_unlock(&tsDcfg.mutex); -} - -void dnodeGetCfg(int32_t *dnodeId, char *clusterId) { - pthread_mutex_lock(&tsDcfg.mutex); - *dnodeId = tsDcfg.dnodeId; - tstrncpy(clusterId, tsDcfg.clusterId, TSDB_CLUSTER_ID_LEN); - pthread_mutex_unlock(&tsDcfg.mutex); -} diff --git a/source/server/dnode/src/dnodeDnodeEps.c b/source/server/dnode/src/dnodeDnodeEps.c deleted file mode 100644 index 15c084439f..0000000000 --- a/source/server/dnode/src/dnodeDnodeEps.c +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "cJSON.h" -#include "thash.h" -#include "tglobal.h" -#include "dnodeDnodeEps.h" -#include "dnodeCfg.h" - -static struct { - int32_t dnodeId; - int32_t dnodeNum; - SDnodeEp * dnodeList; - SHashObj * dnodeHash; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} tsDeps; - -static void dnodePrintEps() { - dDebug("print dnodeEp, dnodeNum:%d", tsDeps.dnodeNum); - for (int32_t i = 0; i < tsDeps.dnodeNum; i++) { - SDnodeEp *ep = &tsDeps.dnodeList[i]; - dDebug("dnode:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort); - } -} - -static void dnodeResetEps(SDnodeEps *data) { - assert(data != NULL); - - if (data->dnodeNum > tsDeps.dnodeNum) { - SDnodeEp *tmp = calloc(data->dnodeNum, sizeof(SDnodeEp)); - if (tmp == NULL) return; - - tfree(tsDeps.dnodeList); - tsDeps.dnodeList = tmp; - tsDeps.dnodeNum = data->dnodeNum; - memcpy(tsDeps.dnodeList, data->dnodeEps, tsDeps.dnodeNum * sizeof(SDnodeEp)); - dnodePrintEps(); - - for (int32_t i = 0; i < tsDeps.dnodeNum; ++i) { - SDnodeEp *ep = &tsDeps.dnodeList[i]; - taosHashPut(tsDeps.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); - } - } -} - -static int32_t dnodeReadEps() { - int32_t len = 0; - int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; - - fp = fopen(tsDeps.file, "r"); - if (!fp) { - dDebug("file %s not exist", tsDeps.file); - goto PRASE_EPS_OVER; - } - - len = (int32_t)fread(content, 1, maxLen, fp); - if (len <= 0) { - dError("failed to read %s since content is null", tsDeps.file); - goto PRASE_EPS_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", tsDeps.file); - goto PRASE_EPS_OVER; - } - - cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum"); - if (!dnodeNum || dnodeNum->type != cJSON_Number) { - dError("failed to read %s since dnodeNum not found", tsDeps.file); - goto PRASE_EPS_OVER; - } - - cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); - if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", tsDeps.file); - goto PRASE_EPS_OVER; - } - - int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); - if (dnodeInfosSize != dnodeNum->valueint) { - dError("failed to read %s since dnodeInfos size:%d not matched dnodeNum:%d", tsDeps.file, dnodeInfosSize, - (int32_t)dnodeNum->valueint); - goto PRASE_EPS_OVER; - } - - tsDeps.dnodeNum = dnodeInfosSize; - tsDeps.dnodeList = calloc(dnodeInfosSize, sizeof(SDnodeEp)); - if (tsDeps.dnodeList == NULL) { - dError("failed to calloc dnodeEpList since %s", strerror(errno)); - goto PRASE_EPS_OVER; - } - - for (int32_t i = 0; i < dnodeInfosSize; ++i) { - cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); - if (dnodeInfo == NULL) break; - - SDnodeEp *ep = &tsDeps.dnodeList[i]; - - cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); - if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read %s, dnodeId not found", tsDeps.file); - goto PRASE_EPS_OVER; - } - ep->dnodeId = (int32_t)dnodeId->valueint; - - cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); - if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", tsDeps.file); - goto PRASE_EPS_OVER; - } - tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); - - cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); - if (!dnodePort || dnodePort->type != cJSON_Number) { - dError("failed to read %s, dnodePort not found", tsDeps.file); - goto PRASE_EPS_OVER; - } - ep->dnodePort = (uint16_t)dnodePort->valueint; - } - - dInfo("succcessed to read file %s", tsDeps.file); - dnodePrintEps(); - -PRASE_EPS_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (fp != NULL) fclose(fp); - - if (dnodeIsDnodeEpChanged(tsDeps.dnodeId, tsLocalEp)) { - dError("dnode:%d, localEp different from %s dnodeEps.json and need reconfigured", tsDeps.dnodeId, tsLocalEp); - return -1; - } - - terrno = 0; - return 0; -} - -static int32_t dnodeWriteEps() { - FILE *fp = fopen(tsDeps.file, "w"); - if (!fp) { - dError("failed to write %s since %s", tsDeps.file, strerror(errno)); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 30000; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsDeps.dnodeNum); - len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < tsDeps.dnodeNum; ++i) { - SDnodeEp *ep = &tsDeps.dnodeList[i]; - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId); - len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); - len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort); - if (i < tsDeps.dnodeNum - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - taosFsyncFile(fileno(fp)); - fclose(fp); - free(content); - terrno = 0; - - dInfo("successed to write %s", tsDeps.file); - return 0; -} - -int32_t dnodeInitDnodeEps() { - tsDeps.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (tsDeps.dnodeHash == NULL) return -1; - - tsDeps.dnodeId = dnodeGetDnodeId(); - tsDeps.dnodeNum = 0; - snprintf(tsDeps.file, sizeof(tsDeps.file), "%s/dnodeEps.json", tsDnodeDir); - pthread_mutex_init(&tsDeps.mutex, NULL); - - int32_t ret = dnodeReadEps(); - if (ret == 0) { - dInfo("dnode eps is initialized"); - } - - return ret; -} - -void dnodeCleanupDnodeEps() { - pthread_mutex_lock(&tsDeps.mutex); - - if (tsDeps.dnodeList != NULL) { - free(tsDeps.dnodeList); - tsDeps.dnodeList = NULL; - } - - if (tsDeps.dnodeHash) { - taosHashCleanup(tsDeps.dnodeHash); - tsDeps.dnodeHash = NULL; - } - - tsDeps.dnodeNum = 0; - pthread_mutex_unlock(&tsDeps.mutex); - pthread_mutex_destroy(&tsDeps.mutex); -} - -void dnodeUpdateDnodeEps(SDnodeEps *data) { - if (data == NULL || data->dnodeNum <= 0) return; - - data->dnodeNum = htonl(data->dnodeNum); - for (int32_t i = 0; i < data->dnodeNum; ++i) { - data->dnodeEps[i].dnodeId = htonl(data->dnodeEps[i].dnodeId); - data->dnodeEps[i].dnodePort = htons(data->dnodeEps[i].dnodePort); - } - - pthread_mutex_lock(&tsDeps.mutex); - - if (data->dnodeNum != tsDeps.dnodeNum) { - dnodeResetEps(data); - dnodeWriteEps(); - } else { - int32_t size = data->dnodeNum * sizeof(SDnodeEp); - if (memcmp(tsDeps.dnodeList, data->dnodeEps, size) != 0) { - dnodeResetEps(data); - dnodeWriteEps(); - } - } - - pthread_mutex_unlock(&tsDeps.mutex); -} - -bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { - bool changed = false; - - pthread_mutex_lock(&tsDeps.mutex); - - SDnodeEp *ep = taosHashGet(tsDeps.dnodeHash, &dnodeId, sizeof(int32_t)); - if (ep != NULL) { - char epSaved[TSDB_EP_LEN + 1]; - snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); - changed = strcmp(epstr, epSaved) != 0; - tstrncpy(epstr, epSaved, TSDB_EP_LEN); - } - - pthread_mutex_unlock(&tsDeps.mutex); - - return changed; -} - -void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { - pthread_mutex_lock(&tsDeps.mutex); - - SDnodeEp *ep = taosHashGet(tsDeps.dnodeHash, &dnodeId, sizeof(int32_t)); - if (ep != NULL) { - if (port) *port = ep->dnodePort; - if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); - if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); - } - - pthread_mutex_unlock(&tsDeps.mutex); -} diff --git a/source/server/dnode/src/dnodeEps.c b/source/server/dnode/src/dnodeEps.c new file mode 100644 index 0000000000..5b843df2f2 --- /dev/null +++ b/source/server/dnode/src/dnodeEps.c @@ -0,0 +1,415 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dnodeEps.h" +#include "cJSON.h" +#include "thash.h" + +static struct { + int32_t dnodeId; + int32_t dropped; + int64_t clusterId; + SDnodeEps *dnodeEps; + SHashObj *dnodeHash; + SRpcEpSet mnodeEpSetForShell; + SRpcEpSet mnodeEpSetForPeer; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} tsEps; + +void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsEps.mutex); + *epSet = tsEps.mnodeEpSetForPeer; + pthread_mutex_unlock(&tsEps.mutex); +} + +void dnodeGetEpSetForShell(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsEps.mutex); + *epSet = tsEps.mnodeEpSetForShell; + pthread_mutex_unlock(&tsEps.mutex); +} + +void dnodeUpdateMnodeEps(SRpcEpSet *ep) { + if (ep != NULL || ep->numOfEps <= 0) { + dError("mnode is changed, but content is invalid, discard it"); + return; + } + + pthread_mutex_lock(&tsEps.mutex); + + dInfo("mnode is changed, num:%d use:%d", ep->numOfEps, ep->inUse); + + tsEps.mnodeEpSetForPeer = *ep; + for (int32_t i = 0; i < ep->numOfEps; ++i) { + ep->port[i] -= TSDB_PORT_DNODEDNODE; + dInfo("mnode index:%d %s:%u", i, ep->fqdn[i], ep->port[i]); + } + tsEps.mnodeEpSetForShell = *ep; + + pthread_mutex_unlock(&tsEps.mutex); +} + +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { + SRpcConnInfo connInfo = {0}; + rpcGetConnInfo(rpcMsg->handle, &connInfo); + + SRpcEpSet epSet = {0}; + if (forShell) { + dnodeGetEpSetForShell(&epSet); + } else { + dnodeGetEpSetForPeer(&epSet); + } + + dDebug("msg:%s will be redirected, num:%d use:%d", taosMsg[rpcMsg->msgType], epSet.numOfEps, epSet.inUse); + + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); + if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) { + if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) || + (epSet.port[i] == tsServerPort && forShell)) { + epSet.inUse = (i + 1) % epSet.numOfEps; + dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); + } + } + + epSet.port[i] = htons(epSet.port[i]); + } + + rpcSendRedirectRsp(rpcMsg->handle, &epSet); +} + +static void dnodePrintEps() { + dDebug("print dnode list, num:%d", tsEps.dnodeEps->dnodeNum); + for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; + dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort, ep->isMnode); + } +} + +static void dnodeResetEps(SDnodeEps *data) { + assert(data != NULL); + + int32_t size = sizeof(SDnodeEps) + data->dnodeNum * sizeof(SDnodeEp); + + if (data->dnodeNum > tsEps.dnodeEps->dnodeNum) { + SDnodeEps *tmp = calloc(1, size); + if (tmp == NULL) return; + + tfree(tsEps.dnodeEps); + tsEps.dnodeEps = tmp; + } + + if (tsEps.dnodeEps != data) { + memcpy(tsEps.dnodeEps, data, size); + } + + tsEps.mnodeEpSetForPeer.inUse = 0; + tsEps.mnodeEpSetForShell.inUse = 0; + int32_t index = 0; + for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; i++) { + SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; + if (!ep->isMnode) continue; + if (index >= TSDB_MAX_REPLICA) continue; + strcpy(tsEps.mnodeEpSetForShell.fqdn[index], ep->dnodeFqdn); + strcpy(tsEps.mnodeEpSetForPeer.fqdn[index], ep->dnodeFqdn); + tsEps.mnodeEpSetForShell.port[index] = ep->dnodePort; + tsEps.mnodeEpSetForShell.port[index] = ep->dnodePort + tsDnodeDnodePort; + index++; + } + + for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; + taosHashPut(tsEps.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); + } + + dnodePrintEps(); +} + +static bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { + bool changed = false; + + pthread_mutex_lock(&tsEps.mutex); + + SDnodeEp *ep = taosHashGet(tsEps.dnodeHash, &dnodeId, sizeof(int32_t)); + if (ep != NULL) { + char epSaved[TSDB_EP_LEN + 1]; + snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); + changed = strcmp(epstr, epSaved) != 0; + tstrncpy(epstr, epSaved, TSDB_EP_LEN); + } + + pthread_mutex_unlock(&tsEps.mutex); + + return changed; +} + +static int32_t dnodeReadEps() { + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + FILE *fp = NULL; + + fp = fopen(tsEps.file, "r"); + if (!fp) { + dDebug("file %s not exist", tsEps.file); + goto PRASE_EPS_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", tsEps.file); + goto PRASE_EPS_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", tsEps.file); + goto PRASE_EPS_OVER; + } + + cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_String) { + dError("failed to read %s since dnodeId not found", tsEps.file); + goto PRASE_EPS_OVER; + } + tsEps.dnodeId = atoi(dnodeId->valuestring); + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_String) { + dError("failed to read %s since dropped not found", tsEps.file); + goto PRASE_EPS_OVER; + } + tsEps.dropped = atoi(dropped->valuestring); + + cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); + if (!clusterId || clusterId->type != cJSON_String) { + dError("failed to read %s since clusterId not found", tsEps.file); + goto PRASE_EPS_OVER; + } + tsEps.clusterId = atoll(clusterId->valuestring); + + cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); + if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { + dError("failed to read %s since dnodeInfos not found", tsEps.file); + goto PRASE_EPS_OVER; + } + + int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); + if (dnodeInfosSize <= 0) { + dError("failed to read %s since dnodeInfos size:%d invalid", tsEps.file, dnodeInfosSize); + goto PRASE_EPS_OVER; + } + + tsEps.dnodeEps = calloc(1, dnodeInfosSize * sizeof(SDnodeEp) + sizeof(SDnodeEps)); + if (tsEps.dnodeEps == NULL) { + dError("failed to calloc dnodeEpList since %s", strerror(errno)); + goto PRASE_EPS_OVER; + } + tsEps.dnodeEps->dnodeNum = dnodeInfosSize; + + for (int32_t i = 0; i < dnodeInfosSize; ++i) { + cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); + if (dnodeInfo == NULL) break; + + SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; + + cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); + if (!dnodeId || dnodeId->type != cJSON_String) { + dError("failed to read %s, dnodeId not found", tsEps.file); + goto PRASE_EPS_OVER; + } + ep->dnodeId = atoi(dnodeId->valuestring); + + cJSON *isMnode = cJSON_GetObjectItem(dnodeInfo, "isMnode"); + if (!isMnode || isMnode->type != cJSON_String) { + dError("failed to read %s, isMnode not found", tsEps.file); + goto PRASE_EPS_OVER; + } + ep->isMnode = atoi(isMnode->valuestring); + + cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); + if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { + dError("failed to read %s, dnodeFqdn not found", tsEps.file); + goto PRASE_EPS_OVER; + } + tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + + cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); + if (!dnodePort || dnodePort->type != cJSON_String) { + dError("failed to read %s, dnodePort not found", tsEps.file); + goto PRASE_EPS_OVER; + } + ep->dnodePort = atoi(dnodePort->valuestring); + } + + dInfo("succcessed to read file %s", tsEps.file); + dnodePrintEps(); + +PRASE_EPS_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + if (dnodeIsDnodeEpChanged(tsEps.dnodeId, tsLocalEp)) { + dError("dnode:%d, localEp %s different with dnodeEps.json and need reconfigured", tsEps.dnodeId, tsLocalEp); + return -1; + } + + dnodeResetEps(tsEps.dnodeEps); + + terrno = 0; + return 0; +} + +static int32_t dnodeWriteEps() { + FILE *fp = fopen(tsEps.file, "w"); + if (!fp) { + dError("failed to write %s since %s", tsEps.file, strerror(errno)); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 30000; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", tsEps.dnodeId); + len += snprintf(content + len, maxLen - len, " \"dropped\": \"%d\",\n", tsEps.dropped); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", tsEps.clusterId); + len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); + for (int32_t i = 0; i < tsEps.dnodeEps->dnodeNum; ++i) { + SDnodeEp *ep = &tsEps.dnodeEps->dnodeEps[i]; + len += snprintf(content + len, maxLen - len, " \"dnodeId\": \"%d\",\n", ep->dnodeId); + len += snprintf(content + len, maxLen - len, " \"isMnode\": \"%d\",\n", ep->isMnode); + len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); + len += snprintf(content + len, maxLen - len, " \"dnodePort\": \"%u\"\n", ep->dnodePort); + if (i < tsEps.dnodeEps->dnodeNum - 1) { + len += snprintf(content + len, maxLen - len, " },{\n"); + } else { + len += snprintf(content + len, maxLen - len, " }]\n"); + } + } + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + terrno = 0; + + dInfo("successed to write %s", tsEps.file); + return 0; +} + +int32_t dnodeInitEps() { + tsEps.dnodeId = 0; + tsEps.dropped = 0; + tsEps.clusterId = 0; + tsEps.dnodeEps = NULL; + snprintf(tsEps.file, sizeof(tsEps.file), "%s/dnodeEps.json", tsDnodeDir); + pthread_mutex_init(&tsEps.mutex, NULL); + + tsEps.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsEps.dnodeHash == NULL) return -1; + + int32_t ret = dnodeReadEps(); + if (ret == 0) { + dInfo("dnode eps is initialized"); + } + + return ret; +} + +void dnodeCleanupEps() { + pthread_mutex_lock(&tsEps.mutex); + + if (tsEps.dnodeEps != NULL) { + free(tsEps.dnodeEps); + tsEps.dnodeEps = NULL; + } + + if (tsEps.dnodeHash) { + taosHashCleanup(tsEps.dnodeHash); + tsEps.dnodeHash = NULL; + } + + pthread_mutex_unlock(&tsEps.mutex); + pthread_mutex_destroy(&tsEps.mutex); +} + +void dnodeUpdateDnodeEps(SDnodeEps *data) { + if (data == NULL || data->dnodeNum <= 0) return; + + pthread_mutex_lock(&tsEps.mutex); + + if (data->dnodeNum != tsEps.dnodeEps->dnodeNum) { + dnodeResetEps(data); + dnodeWriteEps(); + } else { + int32_t size = data->dnodeNum * sizeof(SDnodeEp) + sizeof(SDnodeEps); + if (memcmp(tsEps.dnodeEps, data, size) != 0) { + dnodeResetEps(data); + dnodeWriteEps(); + } + } + + pthread_mutex_unlock(&tsEps.mutex); +} + +void dnodeGetEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { + pthread_mutex_lock(&tsEps.mutex); + + SDnodeEp *ep = taosHashGet(tsEps.dnodeHash, &dnodeId, sizeof(int32_t)); + if (ep != NULL) { + if (port) *port = ep->dnodePort; + if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); + if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); + } + + pthread_mutex_unlock(&tsEps.mutex); +} + +void dnodeUpdateCfg(SDnodeCfg *data) { + if (tsEps.dnodeId != 0 && !data->dropped) return; + + pthread_mutex_lock(&tsEps.mutex); + + tsEps.dnodeId = data->dnodeId; + tsEps.clusterId = data->clusterId; + tsEps.dropped = data->dropped; + dInfo("dnodeId is set to %d, clusterId is set to %" PRId64, data->dnodeId, data->clusterId); + + dnodeWriteEps(); + pthread_mutex_unlock(&tsEps.mutex); +} + +int32_t dnodeGetDnodeId() { + int32_t dnodeId = 0; + pthread_mutex_lock(&tsEps.mutex); + dnodeId = tsEps.dnodeId; + pthread_mutex_unlock(&tsEps.mutex); + return dnodeId; +} + +int64_t dnodeGetClusterId() { + int64_t clusterId = 0; + pthread_mutex_lock(&tsEps.mutex); + clusterId = tsEps.clusterId; + pthread_mutex_unlock(&tsEps.mutex); + return clusterId; +} diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c index 5f0dcc630b..d294143e57 100644 --- a/source/server/dnode/src/dnodeInt.c +++ b/source/server/dnode/src/dnodeInt.c @@ -14,67 +14,176 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#if 0 -#include "qScript.h" -#include "tfile.h" -#include "tsync.h" -#include "twal.h" -#endif -#include "tstep.h" -#include "dnodeCfg.h" #include "dnodeCheck.h" -#include "dnodeDnodeEps.h" -#include "dnodeMain.h" -#include "dnodeMnodeEps.h" -#include "dnodeStatus.h" +#include "dnodeEps.h" +#include "dnodeMsg.h" #include "dnodeTrans.h" #include "mnode.h" +#include "sync.h" +#include "tcache.h" +#include "tconfig.h" +#include "tnote.h" +#include "tstep.h" #include "vnode.h" +#include "wal.h" -static struct SSteps *tsSteps; +static struct { + EDnStat runStatus; + SStartupStep startup; + SSteps *steps; +} tsDnode; -static int32_t dnodeInitVnodeModule(void **unused) { +EDnStat dnodeGetRunStat() { return tsDnode.runStatus; } + +void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; } + +static void dnodeReportStartup(char *name, char *desc) { + SStartupStep *startup = &tsDnode.startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 0; +} + +static void dnodeReportStartupFinished(char *name, char *desc) { + SStartupStep *startup = &tsDnode.startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 1; +} + +void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); } + +static int32_t dnodeInitVnode() { SVnodePara para; - para.fp.GetDnodeEp = dnodeGetDnodeEp; + para.fp.GetDnodeEp = dnodeGetEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; return vnodeInit(para); } -static int32_t dnodeInitMnodeModule(void **unused) { +static int32_t dnodeInitMnode() { SMnodePara para; - para.fp.GetDnodeEp = dnodeGetDnodeEp; + para.fp.GetDnodeEp = dnodeGetEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendRedirectMsg = dnodeSendRedirectMsg; - dnodeGetCfg(¶.dnodeId, para.clusterId); + para.dnodeId = dnodeGetDnodeId(); + para.clusterId = dnodeGetClusterId(); return mnodeInit(para); } +static int32_t dnodeInitTfs() {} + +static int32_t dnodeInitMain() { + tsDnode.runStatus = DN_RUN_STAT_STOPPED; + tscEmbedded = 1; + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + taosInitGlobalCfg(); + taosReadGlobalLogCfg(); + taosSetCoreDump(tsEnableCoreFile); + + if (!taosMkDir(tsLogDir)) { + printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); + return -1; + } + + char temp[TSDB_FILENAME_LEN]; + sprintf(temp, "%s/taosdlog", tsLogDir); + if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { + printf("failed to init log file\n"); + } + + if (!taosReadGlobalCfg()) { + taosPrintGlobalCfg(); + dError("TDengine read global config failed"); + return -1; + } + + dInfo("start to initialize TDengine"); + + taosInitNotes(); + + return taosCheckGlobalCfg(); +} + +static void dnodeCleanupMain() { + taos_cleanup(); + taosCloseLog(); + taosStopCacheRefreshWorker(); +} + +static int32_t dnodeCheckRunning(char *dir) { + char filepath[256] = {0}; + snprintf(filepath, sizeof(filepath), "%s/.running", dir); + + FileFd fd = taosOpenFileCreateWriteTrunc(filepath); + if (fd < 0) { + dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno)); + return -1; + } + + int32_t ret = taosLockFile(fd); + if (ret != 0) { + dError("failed to lock file:%s since %s, quit", filepath, strerror(errno)); + taosCloseFile(fd); + return -1; + } + + return 0; +} + +static int32_t dnodeInitDir() { + sprintf(tsMnodeDir, "%s/mnode", tsDataDir); + sprintf(tsVnodeDir, "%s/vnode", tsDataDir); + sprintf(tsDnodeDir, "%s/dnode", tsDataDir); + + if (!taosMkDir(tsDnodeDir)) { + dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); + return -1; + } + + if (!taosMkDir(tsMnodeDir)) { + dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); + return -1; + } + + if (!taosMkDir(tsVnodeDir)) { + dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno)); + return -1; + } + + if (dnodeCheckRunning(tsDnodeDir) != 0) { + return -1; + } + + return 0; +} + +static void dnodeCleanupDir() {} + int32_t dnodeInit() { - tsSteps = taosStepInit(24, dnodeReportStartup); - if (tsSteps == NULL) return -1; + SSteps *steps = taosStepInit(24, dnodeReportStartup); + if (steps == NULL) return -1; - taosStepAdd(tsSteps, "dnode-main", dnodeInitMain, dnodeCleanupMain); - taosStepAdd(tsSteps, "dnode-storage", dnodeInitStorage, dnodeCleanupStorage); - //taosStepAdd(tsSteps, "dnode-tfs", tfInit, tfCleanup); - taosStepAdd(tsSteps, "dnode-rpc", rpcInit, rpcCleanup); - taosStepAdd(tsSteps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck); - taosStepAdd(tsSteps, "dnode-cfg", dnodeInitCfg, dnodeCleanupCfg); - taosStepAdd(tsSteps, "dnode-deps", dnodeInitDnodeEps, dnodeCleanupDnodeEps); - taosStepAdd(tsSteps, "dnode-meps", dnodeInitMnodeEps, dnodeCleanupMnodeEps); - //taosStepAdd(tsSteps, "dnode-wal", walInit, walCleanUp); - //taosStepAdd(tsSteps, "dnode-sync", syncInit, syncCleanUp); - taosStepAdd(tsSteps, "dnode-vnode", dnodeInitVnodeModule, vnodeCleanup); - taosStepAdd(tsSteps, "dnode-mnode", dnodeInitMnodeModule, mnodeCleanup); - taosStepAdd(tsSteps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); - taosStepAdd(tsSteps, "dnode-status", dnodeInitStatus, dnodeCleanupStatus); - //taosStepAdd(tsSteps, "dnode-script",scriptEnvPoolInit, scriptEnvPoolCleanup); + taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); + taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir); + taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck); + taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); + taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL); + taosStepAdd(steps, "dnode-wal", walInit, walCleanUp); + taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp); + taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps); + taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup); + taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup); + taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); + taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg); - taosStepExec(tsSteps); + tsDnode.steps = steps; + taosStepExec(tsDnode.steps); dnodeSetRunStat(DN_RUN_STAT_RUNNING); dnodeReportStartupFinished("TDengine", "initialized successfully"); @@ -86,7 +195,7 @@ int32_t dnodeInit() { void dnodeCleanup() { if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) { dnodeSetRunStat(DN_RUN_STAT_STOPPED); - taosStepCleanup(tsSteps); - tsSteps = NULL; + taosStepCleanup(tsDnode.steps); + tsDnode.steps = NULL; } } diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c deleted file mode 100644 index 74309dda69..0000000000 --- a/source/server/dnode/src/dnodeMain.c +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "tcache.h" -#include "tconfig.h" -#include "tglobal.h" -#if 0 -#include "tfs.h" -#endif -#include "tnote.h" -#include "tcompression.h" -#include "ttimer.h" -#include "dnodeCfg.h" -#include "dnodeMain.h" -#include "mnode.h" - -static struct { - EDnStat runStatus; - void * dnodeTimer; - SStartupStep startup; -} tsDmain; - -static void dnodeCheckDataDirOpenned(char *dir) { -#if 0 - char filepath[256] = {0}; - snprintf(filepath, sizeof(filepath), "%s/.running", dir); - - int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); - if (fd < 0) { - dError("failed to open lock file:%s, reason: %s, quit", filepath, strerror(errno)); - exit(0); - } - - int32_t ret = flock(fd, LOCK_EX | LOCK_NB); - if (ret != 0) { - dError("failed to lock file:%s ret:%d since %s, database may be running, quit", filepath, ret, strerror(errno)); - close(fd); - exit(0); - } -#endif -} - -int32_t dnodeInitMain() { - tsDmain.runStatus = DN_RUN_STAT_STOPPED; - tsDmain.dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); - if (tsDmain.dnodeTimer == NULL) { - dError("failed to init dnode timer"); - return -1; - } - - tscEmbedded = 1; - taosIgnSIGPIPE(); - taosBlockSIGPIPE(); - taosResolveCRC(); - taosInitGlobalCfg(); - taosReadGlobalLogCfg(); - taosSetCoreDump(tsEnableCoreFile); - - if (!taosMkDir(tsLogDir)) { - printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); - return -1; - } - - char temp[TSDB_FILENAME_LEN]; - sprintf(temp, "%s/taosdlog", tsLogDir); - if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { - printf("failed to init log file\n"); - } - - if (!taosReadGlobalCfg()) { - taosPrintGlobalCfg(); - dError("TDengine read global config failed"); - return -1; - } - - dInfo("start to initialize TDengine"); - - taosInitNotes(); - - return taosCheckGlobalCfg(); -} - -void dnodeCleanupMain() { - if (tsDmain.dnodeTimer != NULL) { - taosTmrCleanUp(tsDmain.dnodeTimer); - tsDmain.dnodeTimer = NULL; - } - -#if 0 - taos_cleanup(); -#endif - taosCloseLog(); - taosStopCacheRefreshWorker(); -} - -int32_t dnodeInitStorage() { -#ifdef TD_TSZ - // compress module init - tsCompressInit(); -#endif - - // storage module init - if (tsDiskCfgNum == 1 && !taosMkDir(tsDataDir)) { - dError("failed to create dir:%s since %s", tsDataDir, strerror(errno)); - return -1; - } - -#if 0 - if (tfsInit(tsDiskCfg, tsDiskCfgNum) < 0) { - dError("failed to init TFS since %s", tstrerror(terrno)); - return -1; - } - - strncpy(tsDataDir, TFS_PRIMARY_PATH(), TSDB_FILENAME_LEN); -#endif - sprintf(tsMnodeDir, "%s/mnode", tsDataDir); - sprintf(tsVnodeDir, "%s/vnode", tsDataDir); - sprintf(tsDnodeDir, "%s/dnode", tsDataDir); - - if (!taosMkDir(tsMnodeDir)) { - dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); - return -1; - } - - if (!taosMkDir(tsDnodeDir)) { - dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); - return -1; - } - -#if 0 - if (tfsMkdir("vnode") < 0) { - dError("failed to create vnode dir since %s", tstrerror(terrno)); - return -1; - } - - if (tfsMkdir("vnode_bak") < 0) { - dError("failed to create vnode_bak dir since %s", tstrerror(terrno)); - return -1; - } - - TDIR *tdir = tfsOpendir("vnode_bak/.staging"); - bool stagingNotEmpty = tfsReaddir(tdir) != NULL; - tfsClosedir(tdir); - - if (stagingNotEmpty) { - dError("vnode_bak/.staging dir not empty, fix it first."); - return -1; - } - - if (tfsMkdir("vnode_bak/.staging") < 0) { - dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno)); - return -1; - } - - dnodeCheckDataDirOpenned(tsDnodeDir); - - taosGetDisk(); - dnodePrintDiskInfo(); -#endif - - dInfo("dnode storage is initialized at %s", tsDnodeDir); - return 0; -} - -void dnodeCleanupStorage() { -#if 0 - // storage destroy - tfsDestroy(); - - #ifdef TD_TSZ - // compress destroy - tsCompressExit(); - #endif -#endif -} - -void dnodeReportStartup(char *name, char *desc) { - SStartupStep *startup = &tsDmain.startup; - tstrncpy(startup->name, name, strlen(startup->name)); - tstrncpy(startup->desc, desc, strlen(startup->desc)); - startup->finished = 0; -} - -void dnodeReportStartupFinished(char *name, char *desc) { - SStartupStep *startup = &tsDmain.startup; - tstrncpy(startup->name, name, strlen(startup->name)); - tstrncpy(startup->desc, desc, strlen(startup->desc)); - startup->finished = 1; -} - -void dnodeProcessStartupReq(SRpcMsg *pMsg) { - dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); - - SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); - memcpy(pStep, &tsDmain.startup, sizeof(SStartupStep)); - - dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); - - SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; - rpcSendResponse(&rpcRsp); - rpcFreeCont(pMsg->pCont); -} - -static int32_t dnodeStartMnode(SRpcMsg *pMsg) { - SCreateMnodeMsg *pCfg = pMsg->pCont; - pCfg->dnodeId = htonl(pCfg->dnodeId); - if (pCfg->dnodeId != dnodeGetDnodeId()) { - dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, - dnodeGetDnodeId()); - return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; - } - - if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { - dDebug("dnodeEp:%s, in create meps msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); - return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; - } - - dDebug("dnode:%d, create meps msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum); - for (int32_t i = 0; i < pCfg->mnodes.mnodeNum; ++i) { - pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId); - dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp); - } - - if (mnodeGetStatus() == MN_STATUS_READY) return 0; - - return mnodeDeploy(); -} - -void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { - int32_t code = dnodeStartMnode(pMsg); - - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; - - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); -} - -void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { - SCfgDnodeMsg *pCfg = pMsg->pCont; - - int32_t code = taosCfgDynamicOptions(pCfg->config); - - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; - - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); -} - -EDnStat dnodeGetRunStat() { return tsDmain.runStatus; } - -void dnodeSetRunStat(EDnStat stat) { tsDmain.runStatus = stat; } - -void* dnodeGetTimer() { return tsDmain.dnodeTimer; } \ No newline at end of file diff --git a/source/server/dnode/src/dnodeMnodeEps.c b/source/server/dnode/src/dnodeMnodeEps.c deleted file mode 100644 index f34ee48238..0000000000 --- a/source/server/dnode/src/dnodeMnodeEps.c +++ /dev/null @@ -1,311 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "cJSON.h" -#include "tglobal.h" -#include "dnodeCfg.h" -#include "dnodeDnodeEps.h" -#include "dnodeMnodeEps.h" -#include "mnode.h" - -static struct { - SRpcEpSet mnodeEpSet; - SMInfos mnodeInfos; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} tsDmeps; - - -static void dnodePrintMnodeEps() { - SRpcEpSet *epset = &tsDmeps.mnodeEpSet; - dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse); - for (int32_t i = 0; i < epset->numOfEps; i++) { - dInfo("ep index:%d, %s:%u", i, epset->fqdn[i], epset->port[i]); - } -} - -static void dnodeResetMnodeEps(SMInfos *mInfos) { - if (mInfos == NULL || mInfos->mnodeNum == 0) { - tsDmeps.mnodeEpSet.numOfEps = 1; - taosGetFqdnPortFromEp(tsFirst, tsDmeps.mnodeEpSet.fqdn[0], &tsDmeps.mnodeEpSet.port[0]); - - if (strcmp(tsSecond, tsFirst) != 0) { - tsDmeps.mnodeEpSet.numOfEps = 2; - taosGetFqdnPortFromEp(tsSecond, tsDmeps.mnodeEpSet.fqdn[1], &tsDmeps.mnodeEpSet.port[1]); - } - dnodePrintMnodeEps(); - return; - } - - int32_t size = sizeof(SMInfos); - memcpy(&tsDmeps.mnodeInfos, mInfos, size); - - tsDmeps.mnodeEpSet.inUse = tsDmeps.mnodeInfos.inUse; - tsDmeps.mnodeEpSet.numOfEps = tsDmeps.mnodeInfos.mnodeNum; - for (int32_t i = 0; i < tsDmeps.mnodeInfos.mnodeNum; i++) { - taosGetFqdnPortFromEp(tsDmeps.mnodeInfos.mnodeInfos[i].mnodeEp, tsDmeps.mnodeEpSet.fqdn[i], &tsDmeps.mnodeEpSet.port[i]); - } - - dnodePrintMnodeEps(); -} - -static int32_t dnodeWriteMnodeEps() { - FILE *fp = fopen(tsDmeps.file, "w"); - if (!fp) { - dError("failed to write %s since %s", tsDmeps.file, strerror(errno)); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 2000; - char * content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDmeps.mnodeInfos.inUse); - len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDmeps.mnodeInfos.mnodeNum); - len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); - for (int32_t i = 0; i < tsDmeps.mnodeInfos.mnodeNum; i++) { - len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDmeps.mnodeInfos.mnodeInfos[i].mnodeId); - len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDmeps.mnodeInfos.mnodeInfos[i].mnodeEp); - if (i < tsDmeps.mnodeInfos.mnodeNum - 1) { - len += snprintf(content + len, maxLen - len, " },{\n"); - } else { - len += snprintf(content + len, maxLen - len, " }]\n"); - } - } - len += snprintf(content + len, maxLen - len, "}\n"); - - fwrite(content, 1, len, fp); - taosFsyncFile(fileno(fp)); - fclose(fp); - free(content); - terrno = 0; - - dInfo("successed to write %s", tsDmeps.file); - return 0; -} - -static int32_t dnodeReadMnodeEps() { - int32_t len = 0; - int32_t maxLen = 2000; - char * content = calloc(1, maxLen + 1); - cJSON * root = NULL; - FILE * fp = NULL; - SMInfos mInfos = {0}; - bool nodeChanged = false; - - fp = fopen(tsDmeps.file, "r"); - if (!fp) { - dDebug("file %s not exist", tsDmeps.file); - goto PARSE_MINFOS_OVER; - } - - len = (int32_t)fread(content, 1, maxLen, fp); - if (len <= 0) { - dError("failed to read %s since content is null", tsDmeps.file); - goto PARSE_MINFOS_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", tsDmeps.file); - goto PARSE_MINFOS_OVER; - } - - cJSON *inUse = cJSON_GetObjectItem(root, "inUse"); - if (!inUse || inUse->type != cJSON_Number) { - dError("failed to read mnodeEpSet.json since inUse not found"); - goto PARSE_MINFOS_OVER; - } - tsDmeps.mnodeInfos.inUse = (int8_t)inUse->valueint; - - cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum"); - if (!nodeNum || nodeNum->type != cJSON_Number) { - dError("failed to read mnodeEpSet.json since nodeNum not found"); - goto PARSE_MINFOS_OVER; - } - mInfos.mnodeNum = (int8_t)nodeNum->valueint; - - cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos"); - if (!nodeInfos || nodeInfos->type != cJSON_Array) { - dError("failed to read mnodeEpSet.json since nodeInfos not found"); - goto PARSE_MINFOS_OVER; - } - - int32_t size = cJSON_GetArraySize(nodeInfos); - if (size != mInfos.mnodeNum) { - dError("failed to read mnodeEpSet.json since nodeInfos size not matched"); - goto PARSE_MINFOS_OVER; - } - - for (int32_t i = 0; i < size; ++i) { - cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); - if (nodeInfo == NULL) continue; - - cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); - if (!nodeId || nodeId->type != cJSON_Number) { - dError("failed to read mnodeEpSet.json since nodeId not found"); - goto PARSE_MINFOS_OVER; - } - - cJSON *nodeEp = cJSON_GetObjectItem(nodeInfo, "nodeEp"); - if (!nodeEp || nodeEp->type != cJSON_String || nodeEp->valuestring == NULL) { - dError("failed to read mnodeEpSet.json since nodeName not found"); - goto PARSE_MINFOS_OVER; - } - - SMInfo *mInfo = &mInfos.mnodeInfos[i]; - mInfo->mnodeId = (int32_t)nodeId->valueint; - tstrncpy(mInfo->mnodeEp, nodeEp->valuestring, TSDB_EP_LEN); - - bool changed = dnodeIsDnodeEpChanged(mInfo->mnodeId, mInfo->mnodeEp); - if (changed) nodeChanged = changed; - } - - dInfo("successed to read file %s", tsDmeps.file); - -PARSE_MINFOS_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (fp != NULL) fclose(fp); - terrno = 0; - - for (int32_t i = 0; i < mInfos.mnodeNum; ++i) { - SMInfo *mInfo = &mInfos.mnodeInfos[i]; - dnodeGetDnodeEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); - } - - dnodeResetMnodeEps(&mInfos); - - if (nodeChanged) { - dnodeWriteMnodeEps(); - } - - return 0; -} - -void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { - SRpcConnInfo connInfo = {0}; - rpcGetConnInfo(rpcMsg->handle, &connInfo); - - SRpcEpSet epSet = {0}; - if (forShell) { - dnodeGetEpSetForShell(&epSet); - } else { - dnodeGetEpSetForPeer(&epSet); - } - - dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType], - taosIpStr(connInfo.clientIp), connInfo.user, epSet.numOfEps, epSet.inUse); - - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%d", i, epSet.fqdn[i], epSet.port[i]); - if (strcmp(epSet.fqdn[i], tsLocalFqdn) == 0) { - if ((epSet.port[i] == tsServerPort + TSDB_PORT_DNODEDNODE && !forShell) || - (epSet.port[i] == tsServerPort && forShell)) { - epSet.inUse = (i + 1) % epSet.numOfEps; - dDebug("mnode index:%d %s:%d set inUse to %d", i, epSet.fqdn[i], epSet.port[i], epSet.inUse); - } - } - - epSet.port[i] = htons(epSet.port[i]); - } - - rpcSendRedirectRsp(rpcMsg->handle, &epSet); -} - -int32_t dnodeInitMnodeEps() { - snprintf(tsDmeps.file, sizeof(tsDmeps.file), "%s/mnodeEpSet.json", tsDnodeDir); - pthread_mutex_init(&tsDmeps.mutex, NULL); - - dnodeResetMnodeEps(NULL); - int32_t ret = dnodeReadMnodeEps(); - if (ret == 0) { - dInfo("dnode mInfos is initialized"); - } - - return ret; -} - -void dnodeCleanupMnodeEps() { - pthread_mutex_destroy(&tsDmeps.mutex); -} - -void dnodeUpdateMnodeFromStatus(SMInfos *mInfos) { - if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) { - dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum); - return; - } - - for (int32_t i = 0; i < mInfos->mnodeNum; ++i) { - SMInfo *minfo = &mInfos->mnodeInfos[i]; - minfo->mnodeId = htonl(minfo->mnodeId); - if (minfo->mnodeId <= 0 || strlen(minfo->mnodeEp) <= 5) { - dError("invalid mInfo:%d since id:%d and ep:%s invalid", i, minfo->mnodeId, minfo->mnodeEp); - return; - } - } - - pthread_mutex_lock(&tsDmeps.mutex); - if (mInfos->mnodeNum != tsDmeps.mnodeInfos.mnodeNum) { - dnodeResetMnodeEps(mInfos); - dnodeWriteMnodeEps(); - } else { - int32_t size = sizeof(SMInfos); - if (memcmp(mInfos, &tsDmeps.mnodeInfos, size) != 0) { - dnodeResetMnodeEps(mInfos); - dnodeWriteMnodeEps(); - } - } - pthread_mutex_unlock(&tsDmeps.mutex); -} - -void dnodeUpdateMnodeFromPeer(SRpcEpSet *ep) { - if (ep->numOfEps <= 0) { - dError("mInfos is changed, but content is invalid, discard it"); - return; - } - - pthread_mutex_lock(&tsDmeps.mutex); - - dInfo("mInfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); - for (int32_t i = 0; i < ep->numOfEps; ++i) { - ep->port[i] -= TSDB_PORT_DNODEDNODE; - dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]); - } - tsDmeps.mnodeEpSet = *ep; - - pthread_mutex_unlock(&tsDmeps.mutex); -} - -void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { - pthread_mutex_lock(&tsDmeps.mutex); - - *epSet = tsDmeps.mnodeEpSet; - for (int32_t i = 0; i < epSet->numOfEps; ++i) { - epSet->port[i] += TSDB_PORT_DNODEDNODE; - } - - pthread_mutex_unlock(&tsDmeps.mutex); -} - -void dnodeGetEpSetForShell(SRpcEpSet *epSet) { - pthread_mutex_lock(&tsDmeps.mutex); - *epSet = tsDmeps.mnodeEpSet; - pthread_mutex_unlock(&tsDmeps.mutex); -} diff --git a/source/server/dnode/src/dnodeMsg.c b/source/server/dnode/src/dnodeMsg.c new file mode 100644 index 0000000000..efdbfa14ca --- /dev/null +++ b/source/server/dnode/src/dnodeMsg.c @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "tthread.h" +#include "dnodeEps.h" +#include "dnodeMsg.h" +#include "mnode.h" +#include "vnode.h" +#include "ttime.h" + +static struct { + pthread_t *threadId; + bool stop; + uint32_t rebootTime; +} tsMsg; + +static void dnodeSendStatusMsg() { + int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); + SStatusMsg *pStatus = rpcMallocCont(contLen); + if (pStatus == NULL) { + dError("failed to malloc status message"); + return; + } + + pStatus->version = htonl(tsVersion); + pStatus->dnodeId = htonl(dnodeGetDnodeId()); + tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); + pStatus->clusterId = htobe64(dnodeGetClusterId()); + pStatus->lastReboot = htonl(tsMsg.rebootTime); + pStatus->numOfCores = htonl(tsNumOfCores); + pStatus->diskAvailable = tsAvailDataDirGB; + + // fill cluster cfg parameters + pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); + pStatus->clusterCfg.checkTime = 0; + tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64); + char timestr[32] = "1970-01-01 00:00:00.00"; + (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN); + tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); + + vnodeGetStatus(pStatus); + contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); + pStatus->openVnodes = htons(pStatus->openVnodes); + + SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; + + dnodeSendMsgToMnode(&rpcMsg); +} + +void dnodeProcessStatusRsp(SRpcMsg *pMsg) { + dTrace("status rsp is received, code:%s", tstrerror(pMsg->code)); + if (pMsg->code != TSDB_CODE_SUCCESS) return; + + SStatusRsp *pStatusRsp = pMsg->pCont; + + SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->clusterId = htobe64(pCfg->clusterId); + pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); + pCfg->numOfDnodes = htonl(pCfg->numOfDnodes); + dnodeUpdateCfg(pCfg); + + if (pCfg->dropped) { + dError("status rsp is received, and set dnode to drop status"); + return; + } + + vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); + + SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); + eps->dnodeNum = htonl(eps->dnodeNum); + for (int32_t i = 0; i < eps->dnodeNum; ++i) { + eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId); + eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort); + } + + dnodeUpdateDnodeEps(eps); +} + +static void *dnodeThreadRoutine(void *param) { + int32_t ms = tsStatusInterval * 1000; + while (!tsMsg.stop) { + taosMsleep(ms); + dnodeSendStatusMsg(); + } +} + +int32_t dnodeInitMsg() { + tsMsg.stop = false; + tsMsg.rebootTime = taosGetTimestampSec(); + tsMsg.threadId = taosCreateThread(dnodeThreadRoutine, NULL); + if (tsMsg.threadId == NULL) { + return -1; + } + + dInfo("dnode msg is initialized"); + return 0; +} + +void dnodeCleanupMsg() { + if (tsMsg.threadId != NULL) { + tsMsg.stop = true; + taosDestoryThread(tsMsg.threadId); + tsMsg.threadId = NULL; + } + + dInfo("dnode msg is cleanuped"); +} + +static int32_t dnodeStartMnode(SRpcMsg *pMsg) { + SCreateMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + pCfg->mnodeNum = htonl(pCfg->mnodeNum); + for (int32_t i = 0; i < pCfg->mnodeNum; ++i) { + pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId); + pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort); + } + + if (pCfg->dnodeId != dnodeGetDnodeId()) { + dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); + return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + } + + if (mnodeGetStatus() == MN_STATUS_READY) return 0; + + return mnodeDeploy(); +} + +void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) { + int32_t code = dnodeStartMnode(pMsg); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { + SCfgDnodeMsg *pCfg = pMsg->pCont; + + int32_t code = taosCfgDynamicOptions(pCfg->config); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessStartupReq(SRpcMsg *pMsg) { + dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); + + SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); + dnodeGetStartup(pStep); + + dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); + + SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); +} diff --git a/source/server/dnode/src/dnodeStatus.c b/source/server/dnode/src/dnodeStatus.c deleted file mode 100644 index 4edcc2e8c2..0000000000 --- a/source/server/dnode/src/dnodeStatus.c +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "ttime.h" -#include "ttimer.h" -#include "tglobal.h" -#include "dnodeCfg.h" -#include "dnodeDnodeEps.h" -#include "dnodeMnodeEps.h" -#include "dnodeStatus.h" -#include "dnodeMain.h" -#include "vnode.h" - -static struct { - void * dnodeTimer; - void * statusTimer; - uint32_t rebootTime; -} tsStatus; - -static void dnodeSendStatusMsg(void *handle, void *tmrId) { - if (tsStatus.statusTimer == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); - dError("failed to start status timer"); - return; - } - - int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); - SStatusMsg *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); - dError("failed to malloc status message"); - return; - } - - dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId); - pStatus->dnodeId = htonl(dnodeGetDnodeId()); - pStatus->version = htonl(tsVersion); - pStatus->lastReboot = htonl(tsStatus.rebootTime); - pStatus->numOfCores = htons((uint16_t)tsNumOfCores); - pStatus->diskAvailable = tsAvailDataDirGB; - pStatus->alternativeRole = tsAlternativeRole; - tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN); - - // fill cluster cfg parameters - pStatus->clusterCfg.numOfMnodes = htonl(tsNumOfMnodes); - pStatus->clusterCfg.mnodeEqualVnodeNum = htonl(tsMnodeEqualVnodeNum); - pStatus->clusterCfg.offlineThreshold = htonl(tsOfflineThreshold); - pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval); - pStatus->clusterCfg.maxtablesPerVnode = htonl(tsMaxTablePerVnode); - pStatus->clusterCfg.maxVgroupsPerDb = htonl(tsMaxVgroupsPerDb); - tstrncpy(pStatus->clusterCfg.arbitrator, tsArbitrator, TSDB_EP_LEN); - tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64); - pStatus->clusterCfg.checkTime = 0; - char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN); - - pStatus->clusterCfg.enableBalance = tsEnableBalance; - pStatus->clusterCfg.flowCtrl = tsEnableFlowCtrl; - pStatus->clusterCfg.slaveQuery = tsEnableSlaveQuery; - pStatus->clusterCfg.adjustMaster = tsEnableAdjustMaster; - - vnodeGetStatus(pStatus); - contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); - pStatus->openVnodes = htons(pStatus->openVnodes); - - SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; - - dnodeSendMsgToMnode(&rpcMsg); -} - -void dnodeProcessStatusRsp(SRpcMsg *pMsg) { - if (pMsg->code != TSDB_CODE_SUCCESS) { - dError("status rsp is received, error:%s", tstrerror(pMsg->code)); - if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) { - char clusterId[TSDB_CLUSTER_ID_LEN]; - dnodeGetClusterId(clusterId); - if (clusterId[0] != '\0') { - dnodeSetDropped(); - dError("exit zombie dropped dnode"); - exit(EXIT_FAILURE); - } - } - - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); - return; - } - - SStatusRsp *pStatusRsp = pMsg->pCont; - SMInfos * minfos = &pStatusRsp->mnodes; - dnodeUpdateMnodeFromStatus(minfos); - - SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; - pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); - pCfg->moduleStatus = htonl(pCfg->moduleStatus); - pCfg->dnodeId = htonl(pCfg->dnodeId); - dnodeUpdateCfg(pCfg); - - vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); - - SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); - dnodeUpdateDnodeEps(pEps); - - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); -} - -int32_t dnodeInitStatus() { - tsStatus.statusTimer = NULL; - tsStatus.dnodeTimer = dnodeGetTimer(); - tsStatus.rebootTime = taosGetTimestampSec(); - taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); - dInfo("dnode status timer is initialized"); - return TSDB_CODE_SUCCESS; -} - -void dnodeCleanupStatus() { - if (tsStatus.statusTimer != NULL) { - taosTmrStopA(&tsStatus.statusTimer); - tsStatus.statusTimer = NULL; - } -} diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 7b48bea622..20601f23e7 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -21,9 +21,8 @@ #define _DEFAULT_SOURCE #include "dnodeTrans.h" -#include "dnodeMain.h" -#include "dnodeMnodeEps.h" -#include "dnodeStatus.h" +#include "dnodeEps.h" +#include "dnodeMsg.h" #include "mnode.h" #include "vnode.h" @@ -97,8 +96,8 @@ static int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -139,7 +138,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeFromPeer(pEpSet); + dnodeUpdateMnodeEps(pEpSet); } RpcMsgFp fp = tsTrans.peerMsgFp[msgType];