diff --git a/source/server/dnode/inc/dnodeMain.h b/source/server/dnode/inc/dnodeMain.h new file mode 100644 index 0000000000..f9c15db1c1 --- /dev/null +++ b/source/server/dnode/inc/dnodeMain.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DNODE_MAIN_H_ +#define _TD_DNODE_MAIN_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dnodeInt.h" + +typedef enum { + TD_RUN_STAT_INIT, + TD_RUN_STAT_RUNNING, + TD_RUN_STAT_STOPPED +} RunStat; + +typedef struct DnMain { + Dnode * dnode; + RunStat runStatus; + void * dnodeTimer; + SStartupStep startup; +} DnMain; + +int32_t dnodeInitMain(Dnode *dnode, DnMain **main); +void dnodeCleanupMain(DnMain **main); +int32_t dnodeInitStorage(Dnode *dnode, void **unused); +void dnodeCleanupStorage(void **unused); +void dnodeReportStartup(Dnode *dnode, char *name, char *desc); +void dnodeReportStartupFinished(Dnode *dnode, char *name, char *desc); +void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg); +void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg); +void dnodeProcessConfigDnodeReq(Dnode *dnode, SRpcMsg *pMsg); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DNODE_MAIN_H_*/ diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c new file mode 100644 index 0000000000..a792cf6d5b --- /dev/null +++ b/source/server/dnode/src/dnodeMain.c @@ -0,0 +1,260 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "tcache.h" +#include "tconfig.h" +#include "tfs.h" +#include "tnote.h" +#include "tscompression.h" +#include "ttimer.h" +#include "dnodeCfg.h" +#include "dnodeMain.h" +#include "mnode.h" + +static int32_t dnodeCreateDir(const char *dir) { + if (mkdir(dir, 0755) != 0 && errno != EEXIST) { + return -1; + } + + return 0; +} + +static void dnodeCheckDataDirOpenned(char *dir) { + char filepath[256] = {0}; + snprintf(filepath, sizeof(filepath), "%s/.running", dir); + + int32_t fd = open(filepath, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + if (fd < 0) { + dError("failed to open lock file:%s, reason: %s, quit", filepath, strerror(errno)); + exit(0); + } + + int32_t ret = flock(fd, LOCK_EX | LOCK_NB); + if (ret != 0) { + dError("failed to lock file:%s ret:%d since %s, database may be running, quit", filepath, ret, strerror(errno)); + close(fd); + exit(0); + } +} + +int32_t dnodeInitMain(Dnode *dnode, DnMain **out) { + DnMain* main = calloc(1, sizeof(DnMain)); + if (main == NULL) return -1; + + main->dnode = dnode; + main->runStatus = TD_RUN_STAT_STOPPED; + main->dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); + if (main->dnodeTimer == NULL) { + dError("failed to init dnode timer"); + return -1; + } + + *out = main; + + tscEmbedded = 1; + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + taosInitGlobalCfg(); + taosReadGlobalLogCfg(); + taosSetCoreDump(); + + if (dnodeCreateDir(tsLogDir) < 0) { + printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); + return -1; + } + + char temp[TSDB_FILENAME_LEN]; + sprintf(temp, "%s/taosdlog", tsLogDir); + if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) { + printf("failed to init log file\n"); + } + + if (!taosReadGlobalCfg()) { + taosPrintGlobalCfg(); + dError("TDengine read global config failed"); + return -1; + } + + dInfo("start to initialize TDengine"); + + taosInitNotes(); + + return taosCheckGlobalCfg(); +} + +void dnodeCleanupMain(DnMain **out) { + DnMain *main = *out; + *out = NULL; + + if (main->dnodeTimer != NULL) { + taosTmrCleanUp(main->dnodeTimer); + main->dnodeTimer = NULL; + } + + taos_cleanup(); + taosCloseLog(); + taosStopCacheRefreshWorker(); + + free(main); +} + +int32_t dnodeInitStorage(Dnode *dnode, void **m) { +#ifdef TD_TSZ + // compress module init + tsCompressInit(); +#endif + + // storage module init + if (tsDiskCfgNum == 1 && dnodeCreateDir(tsDataDir) < 0) { + dError("failed to create dir:%s since %s", tsDataDir, strerror(errno)); + return -1; + } + + if (tfsInit(tsDiskCfg, tsDiskCfgNum) < 0) { + dError("failed to init TFS since %s", tstrerror(terrno)); + return -1; + } + strncpy(tsDataDir, TFS_PRIMARY_PATH(), TSDB_FILENAME_LEN); + sprintf(tsMnodeDir, "%s/mnode", tsDataDir); + sprintf(tsVnodeDir, "%s/vnode", tsDataDir); + sprintf(tsDnodeDir, "%s/dnode", tsDataDir); + + if (dnodeCreateDir(tsMnodeDir) < 0) { + dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); + return -1; + } + + if (dnodeCreateDir(tsDnodeDir) < 0) { + dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); + return -1; + } + + if (tfsMkdir("vnode") < 0) { + dError("failed to create vnode dir since %s", tstrerror(terrno)); + return -1; + } + + if (tfsMkdir("vnode_bak") < 0) { + dError("failed to create vnode_bak dir since %s", tstrerror(terrno)); + return -1; + } + + TDIR *tdir = tfsOpendir("vnode_bak/.staging"); + bool stagingNotEmpty = tfsReaddir(tdir) != NULL; + tfsClosedir(tdir); + + if (stagingNotEmpty) { + dError("vnode_bak/.staging dir not empty, fix it first."); + return -1; + } + + if (tfsMkdir("vnode_bak/.staging") < 0) { + dError("failed to create vnode_bak/.staging dir since %s", tstrerror(terrno)); + return -1; + } + + dnodeCheckDataDirOpenned(tsDnodeDir); + + taosGetDisk(); + taosPrintDiskInfo(); + + dInfo("dnode storage is initialized at %s", tsDnodeDir); + return 0; +} + +void dnodeCleanupStorage(void **m) { + // storage destroy + tfsDestroy(); + + #ifdef TD_TSZ + // compress destroy + tsCompressExit(); + #endif +} + +void dnodeReportStartup(Dnode *dnode, char *name, char *desc) { + SStartupStep *startup = &dnode->main->startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 0; +} + +void dnodeReportStartupFinished(Dnode *dnode, char *name, char *desc) { + SStartupStep *startup = &dnode->main->startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 1; +} + +void dnodeProcessStartupReq(Dnode *dnode, SRpcMsg *pMsg) { + dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); + + SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); + memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep)); + + dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); + + SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); +} + +static int32_t dnodeStartMnode(Dnode *dnode, SRpcMsg *pMsg) { + SCreateMnodeMsg *pCfg = pMsg->pCont; + pCfg->dnodeId = htonl(pCfg->dnodeId); + if (pCfg->dnodeId != dnodeGetDnodeId(dnode->cfg)) { + dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, + dnodeGetDnodeId(dnode->cfg)); + return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; + } + + if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { + dDebug("dnodeEp:%s, in create meps msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); + return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; + } + + dDebug("dnode:%d, create meps msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.mnodeNum); + for (int32_t i = 0; i < pCfg->mnodes.mnodeNum; ++i) { + pCfg->mnodes.mnodeInfos[i].mnodeId = htonl(pCfg->mnodes.mnodeInfos[i].mnodeId); + dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp); + } + + if (mnodeIsServing(dnode->mnode)) return 0; + + return mnodeDeploy(dnode->mnode, &pCfg->mnodes); +} + +void dnodeProcessCreateMnodeReq(Dnode *dnode, SRpcMsg *pMsg) { + int32_t code = dnodeStartMnode(dnode, pMsg); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} + +void dnodeProcessConfigDnodeReq(Dnode *dnode, SRpcMsg *pMsg) { + SCfgDnodeMsg *pCfg = pMsg->pCont; + + int32_t code = taosCfgDynamicOptions(pCfg->config); + + SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code}; + + rpcSendResponse(&rspMsg); + rpcFreeCont(pMsg->pCont); +} \ No newline at end of file