Merge pull request #9755 from taosdata/feature/dnode3
react global variables
This commit is contained in:
commit
e990ff5ade
|
@ -23,9 +23,9 @@ extern "C" {
|
|||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||
typedef struct SDnode SDnode;
|
||||
typedef struct SBnode SBnode;
|
||||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
|
||||
|
||||
typedef struct {
|
||||
int64_t numOfErrors;
|
||||
|
@ -33,12 +33,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t sver;
|
||||
} SBnodeCfg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
SBnodeCfg cfg;
|
||||
SDnode *pDnode;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
|
|
|
@ -22,15 +22,39 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||
/* ------------------------ TYPES EXPOSED ---------------- */
|
||||
typedef struct SDnode SDnode;
|
||||
|
||||
/* ------------------------ Environment ------------------ */
|
||||
typedef struct {
|
||||
int32_t sver;
|
||||
int32_t numOfCores;
|
||||
int32_t numOfSupportVnodes;
|
||||
int16_t numOfCommitThreads;
|
||||
int8_t enableTelem;
|
||||
char timezone[TSDB_TIMEZONE_LEN];
|
||||
char locale[TSDB_LOCALE_LEN];
|
||||
char charset[TSDB_LOCALE_LEN];
|
||||
char buildinfo[64];
|
||||
char gitinfo[48];
|
||||
} SDnodeEnvCfg;
|
||||
|
||||
/**
|
||||
* @brief Initialize the environment
|
||||
*
|
||||
* @param pOption Option of the environment
|
||||
* @return int32_t 0 for success and -1 for failure
|
||||
*/
|
||||
int32_t dndInit(const SDnodeEnvCfg *pCfg);
|
||||
|
||||
/**
|
||||
* @brief clear the environment
|
||||
*
|
||||
*/
|
||||
void dndCleanup();
|
||||
|
||||
/* ------------------------ SDnode ----------------------- */
|
||||
typedef struct {
|
||||
int32_t numOfSupportVnodes;
|
||||
int32_t statusInterval;
|
||||
float numOfThreadsPerCore;
|
||||
float ratioOfQueryCores;
|
||||
|
@ -41,28 +65,22 @@ typedef struct {
|
|||
char localEp[TSDB_EP_LEN];
|
||||
char localFqdn[TSDB_FQDN_LEN];
|
||||
char firstEp[TSDB_EP_LEN];
|
||||
char timezone[TSDB_TIMEZONE_LEN];
|
||||
char locale[TSDB_LOCALE_LEN];
|
||||
char charset[TSDB_LOCALE_LEN];
|
||||
char buildinfo[64];
|
||||
char gitinfo[48];
|
||||
} SDnodeOpt;
|
||||
} SDnodeObjCfg;
|
||||
|
||||
/* ------------------------ SDnode ------------------------ */
|
||||
/**
|
||||
* @brief Initialize and start the dnode.
|
||||
*
|
||||
* @param pOption Option of the dnode.
|
||||
* @param pCfg Config of the dnode.
|
||||
* @return SDnode* The dnode object.
|
||||
*/
|
||||
SDnode *dndInit(SDnodeOpt *pOption);
|
||||
SDnode *dndCreate(SDnodeObjCfg *pCfg);
|
||||
|
||||
/**
|
||||
* @brief Stop and cleanup the dnode.
|
||||
*
|
||||
* @param pDnode The dnode object to close.
|
||||
*/
|
||||
void dndCleanup(SDnode *pDnode);
|
||||
void dndClose(SDnode *pDnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -23,9 +23,9 @@ extern "C" {
|
|||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||
typedef struct SDnode SDnode;
|
||||
typedef struct SQnode SQnode;
|
||||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
|
||||
|
||||
typedef struct {
|
||||
int64_t numOfStartTask;
|
||||
|
@ -40,12 +40,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t sver;
|
||||
} SQnodeCfg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
SQnodeCfg cfg;
|
||||
SDnode *pDnode;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
|
|
|
@ -23,9 +23,9 @@ extern "C" {
|
|||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||
typedef struct SDnode SDnode;
|
||||
typedef struct SSnode SSnode;
|
||||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
|
||||
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *pMsg);
|
||||
typedef int32_t (*SendReqToMnodeFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
|
||||
typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *pMsg);
|
||||
|
||||
typedef struct {
|
||||
int64_t numOfErrors;
|
||||
|
@ -33,12 +33,8 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t sver;
|
||||
} SSnodeCfg;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
SSnodeCfg cfg;
|
||||
SDnode *pDnode;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
|
|
|
@ -89,10 +89,10 @@ typedef struct {
|
|||
int vnodeInit(const SVnodeOpt *pOption);
|
||||
|
||||
/**
|
||||
* @brief clear a vnode
|
||||
* @brief Cleanup the vnode module
|
||||
*
|
||||
*/
|
||||
void vnodeClear();
|
||||
void vnodeCleanup();
|
||||
|
||||
/**
|
||||
* @brief Open a VNODE.
|
||||
|
|
|
@ -49,7 +49,7 @@ typedef struct {
|
|||
} STierMeta;
|
||||
|
||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
||||
void tfsDestroy();
|
||||
void tfsCleanup();
|
||||
void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
|
||||
void tfsGetMeta(SFSMeta *pMeta);
|
||||
void tfsAllocDisk(int expLevel, int *level, int *id);
|
||||
|
|
|
@ -70,6 +70,8 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108)
|
||||
#define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109)
|
||||
#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A)
|
||||
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x010B)
|
||||
|
||||
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
|
||||
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
|
||||
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
|
||||
|
|
|
@ -30,12 +30,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SBnode {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
SBnodeCfg cfg;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
SendRedirectRspFp sendRedirectRspFp;
|
||||
SBnodeOpt opt;
|
||||
} SBnode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -28,11 +28,11 @@ static struct {
|
|||
bool printAuth;
|
||||
bool printVersion;
|
||||
char configDir[PATH_MAX];
|
||||
} global = {0};
|
||||
} dmn = {0};
|
||||
|
||||
void dmnSigintHandle(int signum, void *info, void *ctx) {
|
||||
uInfo("singal:%d is received", signum);
|
||||
global.stop = true;
|
||||
dmn.stop = true;
|
||||
}
|
||||
|
||||
void dmnSetSignalHandle() {
|
||||
|
@ -44,7 +44,7 @@ void dmnSetSignalHandle() {
|
|||
}
|
||||
|
||||
int dmnParseOption(int argc, char const *argv[]) {
|
||||
tstrncpy(global.configDir, "/etc/taos", PATH_MAX);
|
||||
tstrncpy(dmn.configDir, "/etc/taos", PATH_MAX);
|
||||
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
if (strcmp(argv[i], "-c") == 0) {
|
||||
|
@ -53,19 +53,19 @@ int dmnParseOption(int argc, char const *argv[]) {
|
|||
printf("config file path overflow");
|
||||
return -1;
|
||||
}
|
||||
tstrncpy(global.configDir, argv[i], PATH_MAX);
|
||||
tstrncpy(dmn.configDir, argv[i], PATH_MAX);
|
||||
} else {
|
||||
printf("'-c' requires a parameter, default is %s\n", configDir);
|
||||
return -1;
|
||||
}
|
||||
} else if (strcmp(argv[i], "-C") == 0) {
|
||||
global.dumpConfig = true;
|
||||
dmn.dumpConfig = true;
|
||||
} else if (strcmp(argv[i], "-k") == 0) {
|
||||
global.generateGrant = true;
|
||||
dmn.generateGrant = true;
|
||||
} else if (strcmp(argv[i], "-A") == 0) {
|
||||
global.printAuth = true;
|
||||
dmn.printAuth = true;
|
||||
} else if (strcmp(argv[i], "-V") == 0) {
|
||||
global.printVersion = true;
|
||||
dmn.printVersion = true;
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ void dmnPrintVersion() {
|
|||
}
|
||||
|
||||
int dmnReadConfig(const char *path) {
|
||||
tstrncpy(configDir, global.configDir, PATH_MAX);
|
||||
tstrncpy(configDir, dmn.configDir, PATH_MAX);
|
||||
taosInitGlobalCfg();
|
||||
taosReadGlobalLogCfg();
|
||||
|
||||
|
@ -114,12 +114,12 @@ int dmnReadConfig(const char *path) {
|
|||
}
|
||||
|
||||
if (taosReadCfgFromFile() != 0) {
|
||||
uError("failed to read global config");
|
||||
uError("failed to read config");
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosCheckAndPrintCfg() != 0) {
|
||||
uError("failed to check global config");
|
||||
uError("failed to check config");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -131,38 +131,50 @@ void dmnDumpConfig() { taosDumpGlobalCfg(); }
|
|||
|
||||
void dmnWaitSignal() {
|
||||
dmnSetSignalHandle();
|
||||
while (!global.stop) {
|
||||
while (!dmn.stop) {
|
||||
taosMsleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
void dmnInitOption(SDnodeOpt *pOption) {
|
||||
pOption->sver = 30000000; //3.0.0.0
|
||||
pOption->numOfCores = tsNumOfCores;
|
||||
pOption->numOfSupportVnodes = tsNumOfSupportVnodes;
|
||||
pOption->numOfCommitThreads = tsNumOfCommitThreads;
|
||||
pOption->statusInterval = tsStatusInterval;
|
||||
pOption->numOfThreadsPerCore = tsNumOfThreadsPerCore;
|
||||
pOption->ratioOfQueryCores = tsRatioOfQueryCores;
|
||||
pOption->maxShellConns = tsMaxShellConns;
|
||||
pOption->shellActivityTimer = tsShellActivityTimer;
|
||||
pOption->serverPort = tsServerPort;
|
||||
tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN);
|
||||
tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN);
|
||||
tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||
tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN);
|
||||
tstrncpy(pOption->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
|
||||
tstrncpy(pOption->locale, tsLocale, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pOption->charset, tsCharset, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pOption->buildinfo, buildinfo, 64);
|
||||
tstrncpy(pOption->gitinfo, gitinfo, 48);
|
||||
void dnmInitEnvCfg(SDnodeEnvCfg *pCfg) {
|
||||
pCfg->sver = 30000000; // 3.0.0.0
|
||||
pCfg->numOfCores = tsNumOfCores;
|
||||
pCfg->numOfCommitThreads = tsNumOfCommitThreads;
|
||||
pCfg->enableTelem = 0;
|
||||
tstrncpy(pCfg->timezone, tsTimezone, TSDB_TIMEZONE_LEN);
|
||||
tstrncpy(pCfg->locale, tsLocale, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pCfg->charset, tsCharset, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pCfg->buildinfo, buildinfo, 64);
|
||||
tstrncpy(pCfg->gitinfo, gitinfo, 48);
|
||||
}
|
||||
|
||||
void dmnInitObjCfg(SDnodeObjCfg *pCfg) {
|
||||
pCfg->numOfSupportVnodes = tsNumOfSupportVnodes;
|
||||
pCfg->statusInterval = tsStatusInterval;
|
||||
pCfg->numOfThreadsPerCore = tsNumOfThreadsPerCore;
|
||||
pCfg->ratioOfQueryCores = tsRatioOfQueryCores;
|
||||
pCfg->maxShellConns = tsMaxShellConns;
|
||||
pCfg->shellActivityTimer = tsShellActivityTimer;
|
||||
pCfg->serverPort = tsServerPort;
|
||||
tstrncpy(pCfg->dataDir, tsDataDir, TSDB_FILENAME_LEN);
|
||||
tstrncpy(pCfg->localEp, tsLocalEp, TSDB_EP_LEN);
|
||||
tstrncpy(pCfg->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN);
|
||||
tstrncpy(pCfg->firstEp, tsFirst, TSDB_EP_LEN);
|
||||
}
|
||||
|
||||
int dmnRunDnode() {
|
||||
SDnodeOpt option = {0};
|
||||
dmnInitOption(&option);
|
||||
SDnodeEnvCfg envCfg = {0};
|
||||
SDnodeObjCfg objCfg = {0};
|
||||
|
||||
SDnode *pDnode = dndInit(&option);
|
||||
dnmInitEnvCfg(&envCfg);
|
||||
dmnInitObjCfg(&objCfg);
|
||||
|
||||
if (dndInit(&envCfg) != 0) {
|
||||
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDnode *pDnode = dndCreate(&objCfg);
|
||||
if (pDnode == NULL) {
|
||||
uInfo("Failed to start TDengine, please check the log at %s", tsLogDir);
|
||||
return -1;
|
||||
|
@ -172,7 +184,8 @@ int dmnRunDnode() {
|
|||
dmnWaitSignal();
|
||||
uInfo("TDengine is shut down!");
|
||||
|
||||
dndCleanup(pDnode);
|
||||
dndClose(pDnode);
|
||||
dndCleanup();
|
||||
taosCloseLog();
|
||||
return 0;
|
||||
}
|
||||
|
@ -182,21 +195,21 @@ int main(int argc, char const *argv[]) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (global.generateGrant) {
|
||||
if (dmn.generateGrant) {
|
||||
dmnGenerateGrant();
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (global.printVersion) {
|
||||
if (dmn.printVersion) {
|
||||
dmnPrintVersion();
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (dmnReadConfig(global.configDir) != 0) {
|
||||
if (dmnReadConfig(dmn.configDir) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (global.dumpConfig) {
|
||||
if (dmn.dumpConfig) {
|
||||
dmnDumpConfig();
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitBnode(SDnode *pDnode);
|
||||
void dndCleanupBnode(SDnode *pDnode);
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DND_ENV_H_
|
||||
#define _TD_DND_ENV_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "dndInt.h"
|
||||
|
||||
typedef struct {
|
||||
EWorkerType type;
|
||||
const char *name;
|
||||
int32_t minNum;
|
||||
int32_t maxNum;
|
||||
void *queueFp;
|
||||
SDnode *pDnode;
|
||||
STaosQueue *queue;
|
||||
union {
|
||||
SWorkerPool pool;
|
||||
SMWorkerPool mpool;
|
||||
};
|
||||
} SDnodeWorker;
|
||||
|
||||
typedef struct {
|
||||
char *dnode;
|
||||
char *mnode;
|
||||
char *snode;
|
||||
char *bnode;
|
||||
char *vnodes;
|
||||
} SDnodeDir;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int32_t dropped;
|
||||
int64_t clusterId;
|
||||
int64_t dver;
|
||||
int64_t rebootTime;
|
||||
int64_t updateTime;
|
||||
int8_t statusSent;
|
||||
SEpSet mnodeEpSet;
|
||||
char *file;
|
||||
SHashObj *dnodeHash;
|
||||
SDnodeEps *dnodeEps;
|
||||
pthread_t *threadId;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker mgmtWorker;
|
||||
SDnodeWorker statusWorker;
|
||||
} SDnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SMnode *pMnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker readWorker;
|
||||
SDnodeWorker writeWorker;
|
||||
SDnodeWorker syncWorker;
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
} SMnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SQnode *pQnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker queryWorker;
|
||||
SDnodeWorker fetchWorker;
|
||||
} SQnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SSnode *pSnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker writeWorker;
|
||||
} SSnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SBnode *pBnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker writeWorker;
|
||||
} SBnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
SHashObj *hash;
|
||||
int32_t openVnodes;
|
||||
int32_t totalVnodes;
|
||||
SRWLatch latch;
|
||||
SWorkerPool queryPool;
|
||||
SWorkerPool fetchPool;
|
||||
SMWorkerPool syncPool;
|
||||
SMWorkerPool writePool;
|
||||
} SVnodesMgmt;
|
||||
|
||||
typedef struct {
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
DndMsgFp msgFp[TDMT_MAX];
|
||||
} STransMgmt;
|
||||
|
||||
typedef struct SDnode {
|
||||
EStat stat;
|
||||
SDnodeObjCfg cfg;
|
||||
SDnodeEnvCfg env;
|
||||
SDnodeDir dir;
|
||||
FileFd lockFd;
|
||||
SDnodeMgmt dmgmt;
|
||||
SMnodeMgmt mmgmt;
|
||||
SQnodeMgmt qmgmt;
|
||||
SSnodeMgmt smgmt;
|
||||
SBnodeMgmt bmgmt;
|
||||
SVnodesMgmt vmgmt;
|
||||
STransMgmt tmgmt;
|
||||
SStartupReq startup;
|
||||
} SDnode;
|
||||
|
||||
typedef struct {
|
||||
int8_t once;
|
||||
SDnodeEnvCfg cfg;
|
||||
} SDnodeEnv;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_ENV_H_*/
|
|
@ -55,125 +55,12 @@ extern int32_t dDebugFlag;
|
|||
|
||||
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat;
|
||||
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
|
||||
typedef enum { DND_ENV_INIT = 0, DND_ENV_READY = 1, DND_ENV_CLEANUP = 2 } EEnvStat;
|
||||
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
|
||||
|
||||
typedef struct {
|
||||
EWorkerType type;
|
||||
const char *name;
|
||||
int32_t minNum;
|
||||
int32_t maxNum;
|
||||
void *queueFp;
|
||||
SDnode *pDnode;
|
||||
STaosQueue *queue;
|
||||
union {
|
||||
SWorkerPool pool;
|
||||
SMWorkerPool mpool;
|
||||
};
|
||||
} SDnodeWorker;
|
||||
|
||||
typedef struct {
|
||||
char *dnode;
|
||||
char *mnode;
|
||||
char *snode;
|
||||
char *bnode;
|
||||
char *vnodes;
|
||||
} SDnodeDir;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int32_t dropped;
|
||||
int64_t clusterId;
|
||||
int64_t dver;
|
||||
int64_t rebootTime;
|
||||
int64_t updateTime;
|
||||
int8_t statusSent;
|
||||
SEpSet mnodeEpSet;
|
||||
char *file;
|
||||
SHashObj *dnodeHash;
|
||||
SDnodeEps *dnodeEps;
|
||||
pthread_t *threadId;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker mgmtWorker;
|
||||
SDnodeWorker statusWorker;
|
||||
} SDnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SMnode *pMnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker readWorker;
|
||||
SDnodeWorker writeWorker;
|
||||
SDnodeWorker syncWorker;
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
} SMnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SQnode *pQnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker queryWorker;
|
||||
SDnodeWorker fetchWorker;
|
||||
} SQnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SSnode *pSnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker writeWorker;
|
||||
} SSnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
int32_t refCount;
|
||||
int8_t deployed;
|
||||
int8_t dropped;
|
||||
SBnode *pBnode;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker writeWorker;
|
||||
} SBnodeMgmt;
|
||||
|
||||
typedef struct {
|
||||
SHashObj *hash;
|
||||
int32_t openVnodes;
|
||||
int32_t totalVnodes;
|
||||
SRWLatch latch;
|
||||
SWorkerPool queryPool;
|
||||
SWorkerPool fetchPool;
|
||||
SMWorkerPool syncPool;
|
||||
SMWorkerPool writePool;
|
||||
} SVnodesMgmt;
|
||||
|
||||
typedef struct {
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
DndMsgFp msgFp[TDMT_MAX];
|
||||
} STransMgmt;
|
||||
|
||||
typedef struct SDnode {
|
||||
EStat stat;
|
||||
SDnodeOpt opt;
|
||||
SDnodeDir dir;
|
||||
FileFd lockFd;
|
||||
SDnodeMgmt dmgmt;
|
||||
SMnodeMgmt mmgmt;
|
||||
SQnodeMgmt qmgmt;
|
||||
SSnodeMgmt smgmt;
|
||||
SBnodeMgmt bmgmt;
|
||||
SVnodesMgmt vmgmt;
|
||||
STransMgmt tmgmt;
|
||||
SStartupReq startup;
|
||||
} SDnode;
|
||||
|
||||
EStat dndGetStat(SDnode *pDnode);
|
||||
void dndSetStat(SDnode *pDnode, EStat stat);
|
||||
char *dndStatStr(EStat stat);
|
||||
const char *dndStatStr(EStat stat);
|
||||
|
||||
void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
|
||||
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitMgmt(SDnode *pDnode);
|
||||
void dndStopMgmt(SDnode *pDnode);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitMnode(SDnode *pDnode);
|
||||
void dndCleanupMnode(SDnode *pDnode);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitQnode(SDnode *pDnode);
|
||||
void dndCleanupQnode(SDnode *pDnode);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitSnode(SDnode *pDnode);
|
||||
void dndCleanupSnode(SDnode *pDnode);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitTrans(SDnode *pDnode);
|
||||
void dndCleanupTrans(SDnode *pDnode);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitVnodes(SDnode *pDnode);
|
||||
void dndCleanupVnodes(SDnode *pDnode);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dndInt.h"
|
||||
#include "dndEnv.h"
|
||||
|
||||
int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
|
||||
int32_t maxNum, void *queueFp);
|
||||
|
|
|
@ -179,7 +179,7 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
|
|||
pOption->sendRedirectRspFp = dndSendRedirectRsp;
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
pOption->cfg.sver = pDnode->opt.sver;
|
||||
pOption->sver = pDnode->env.sver;
|
||||
}
|
||||
|
||||
static int32_t dndOpenBnode(SDnode *pDnode) {
|
||||
|
|
|
@ -25,6 +25,8 @@
|
|||
#include "tfs.h"
|
||||
#include "wal.h"
|
||||
|
||||
static SDnodeEnv dndEnv = {0};
|
||||
|
||||
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
|
||||
|
||||
void dndSetStat(SDnode *pDnode, EStat stat) {
|
||||
|
@ -32,7 +34,7 @@ void dndSetStat(SDnode *pDnode, EStat stat) {
|
|||
pDnode->stat = stat;
|
||||
}
|
||||
|
||||
char *dndStatStr(EStat stat) {
|
||||
const char *dndStatStr(EStat stat) {
|
||||
switch (stat) {
|
||||
case DND_STAT_INIT:
|
||||
return "init";
|
||||
|
@ -79,25 +81,26 @@ static FileFd dndCheckRunning(char *dataDir) {
|
|||
return fd;
|
||||
}
|
||||
|
||||
static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
|
||||
pDnode->lockFd = dndCheckRunning(pOption->dataDir);
|
||||
static int32_t dndCreateImp(SDnode *pDnode, SDnodeObjCfg *pCfg) {
|
||||
pDnode->lockFd = dndCheckRunning(pCfg->dataDir);
|
||||
if (pDnode->lockFd < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
char path[PATH_MAX + 100];
|
||||
snprintf(path, sizeof(path), "%s%smnode", pOption->dataDir, TD_DIRSEP);
|
||||
snprintf(path, sizeof(path), "%s%smnode", pCfg->dataDir, TD_DIRSEP);
|
||||
pDnode->dir.mnode = tstrdup(path);
|
||||
snprintf(path, sizeof(path), "%s%svnode", pOption->dataDir, TD_DIRSEP);
|
||||
snprintf(path, sizeof(path), "%s%svnode", pCfg->dataDir, TD_DIRSEP);
|
||||
pDnode->dir.vnodes = tstrdup(path);
|
||||
snprintf(path, sizeof(path), "%s%sdnode", pOption->dataDir, TD_DIRSEP);
|
||||
snprintf(path, sizeof(path), "%s%sdnode", pCfg->dataDir, TD_DIRSEP);
|
||||
pDnode->dir.dnode = tstrdup(path);
|
||||
snprintf(path, sizeof(path), "%s%ssnode", pOption->dataDir, TD_DIRSEP);
|
||||
snprintf(path, sizeof(path), "%s%ssnode", pCfg->dataDir, TD_DIRSEP);
|
||||
pDnode->dir.snode = tstrdup(path);
|
||||
snprintf(path, sizeof(path), "%s%sbnode", pOption->dataDir, TD_DIRSEP);
|
||||
snprintf(path, sizeof(path), "%s%sbnode", pCfg->dataDir, TD_DIRSEP);
|
||||
pDnode->dir.bnode = tstrdup(path);
|
||||
|
||||
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL) {
|
||||
if (pDnode->dir.mnode == NULL || pDnode->dir.vnodes == NULL || pDnode->dir.dnode == NULL ||
|
||||
pDnode->dir.snode == NULL || pDnode->dir.bnode == NULL) {
|
||||
dError("failed to malloc dir object");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -133,11 +136,12 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOption) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
memcpy(&pDnode->opt, pOption, sizeof(SDnodeOpt));
|
||||
memcpy(&pDnode->cfg, pCfg, sizeof(SDnodeObjCfg));
|
||||
memcpy(&pDnode->env, &dndEnv.cfg, sizeof(SDnodeEnvCfg));
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndCleanupEnv(SDnode *pDnode) {
|
||||
static void dndCloseImp(SDnode *pDnode) {
|
||||
tfree(pDnode->dir.mnode);
|
||||
tfree(pDnode->dir.vnodes);
|
||||
tfree(pDnode->dir.dnode);
|
||||
|
@ -149,126 +153,95 @@ static void dndCleanupEnv(SDnode *pDnode) {
|
|||
taosCloseFile(pDnode->lockFd);
|
||||
pDnode->lockFd = 0;
|
||||
}
|
||||
|
||||
taosStopCacheRefreshWorker();
|
||||
}
|
||||
|
||||
SDnode *dndInit(SDnodeOpt *pOption) {
|
||||
taosIgnSIGPIPE();
|
||||
taosBlockSIGPIPE();
|
||||
taosResolveCRC();
|
||||
SDnode *dndCreate(SDnodeObjCfg *pCfg) {
|
||||
dInfo("start to create dnode object");
|
||||
|
||||
SDnode *pDnode = calloc(1, sizeof(SDnode));
|
||||
if (pDnode == NULL) {
|
||||
dError("failed to create dnode object");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
dError("failed to create dnode object since %s", terrstr());
|
||||
return NULL;
|
||||
}
|
||||
|
||||
dInfo("start to initialize TDengine");
|
||||
dndSetStat(pDnode, DND_STAT_INIT);
|
||||
|
||||
if (dndInitEnv(pDnode, pOption) != 0) {
|
||||
dError("failed to init env");
|
||||
dndCleanup(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rpcInit() != 0) {
|
||||
dError("failed to init rpc env");
|
||||
dndCleanup(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (walInit() != 0) {
|
||||
dError("failed to init wal env");
|
||||
dndCleanup(pDnode);
|
||||
if (dndCreateImp(pDnode, pCfg) != 0) {
|
||||
dError("failed to init dnode dir since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SDiskCfg dCfg;
|
||||
strcpy(dCfg.dir, pDnode->opt.dataDir);
|
||||
strcpy(dCfg.dir, pDnode->cfg.dataDir);
|
||||
dCfg.level = 0;
|
||||
dCfg.primary = 1;
|
||||
if (tfsInit(&dCfg, 1) != 0) {
|
||||
dError("failed to init tfs env");
|
||||
dndCleanup(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SVnodeOpt vnodeOpt = {
|
||||
.sver = pDnode->opt.sver,
|
||||
.timezone = pDnode->opt.timezone,
|
||||
.locale = pDnode->opt.locale,
|
||||
.charset = pDnode->opt.charset,
|
||||
.nthreads = pDnode->opt.numOfCommitThreads,
|
||||
.putReqToVQueryQFp = dndPutReqToVQueryQ,
|
||||
};
|
||||
if (vnodeInit(&vnodeOpt) != 0) {
|
||||
dError("failed to init vnode env");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init tfs since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitMgmt(pDnode) != 0) {
|
||||
dError("failed to init dnode");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init mgmt since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitVnodes(pDnode) != 0) {
|
||||
dError("failed to init vnodes");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init vnodes since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitQnode(pDnode) != 0) {
|
||||
dError("failed to init qnode");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init qnode since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitSnode(pDnode) != 0) {
|
||||
dError("failed to init snode");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init snode since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitBnode(pDnode) != 0) {
|
||||
dError("failed to init bnode");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init bnode since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitMnode(pDnode) != 0) {
|
||||
dError("failed to init mnode");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init mnode since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (dndInitTrans(pDnode) != 0) {
|
||||
dError("failed to init transport");
|
||||
dndCleanup(pDnode);
|
||||
dError("failed to init transport since %s", terrstr());
|
||||
dndClose(pDnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
dndSetStat(pDnode, DND_STAT_RUNNING);
|
||||
dndSendStatusReq(pDnode);
|
||||
dndReportStartup(pDnode, "TDengine", "initialized successfully");
|
||||
dInfo("TDengine is initialized successfully, pDnode:%p", pDnode);
|
||||
dInfo("dnode object is created, data:%p", pDnode);
|
||||
|
||||
return pDnode;
|
||||
}
|
||||
|
||||
void dndCleanup(SDnode *pDnode) {
|
||||
void dndClose(SDnode *pDnode) {
|
||||
if (pDnode == NULL) return;
|
||||
|
||||
if (dndGetStat(pDnode) == DND_STAT_STOPPED) {
|
||||
dError("dnode is shutting down");
|
||||
dError("dnode is shutting down, data:%p", pDnode);
|
||||
return;
|
||||
}
|
||||
|
||||
dInfo("start to cleanup TDengine");
|
||||
dInfo("start to close dnode, data:%p", pDnode);
|
||||
dndSetStat(pDnode, DND_STAT_STOPPED);
|
||||
dndCleanupTrans(pDnode);
|
||||
dndStopMgmt(pDnode);
|
||||
|
@ -278,12 +251,66 @@ void dndCleanup(SDnode *pDnode) {
|
|||
dndCleanupQnode(pDnode);
|
||||
dndCleanupVnodes(pDnode);
|
||||
dndCleanupMgmt(pDnode);
|
||||
vnodeClear();
|
||||
tfsDestroy();
|
||||
tfsCleanup();
|
||||
|
||||
dndCloseImp(pDnode);
|
||||
free(pDnode);
|
||||
dInfo("dnode object is closed, data:%p", pDnode);
|
||||
}
|
||||
|
||||
int32_t dndInit(const SDnodeEnvCfg *pCfg) {
|
||||
if (atomic_val_compare_exchange_8(&dndEnv.once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
|
||||
terrno = TSDB_CODE_REPEAT_INIT;
|
||||
dError("failed to init dnode env since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosIgnSIGPIPE();
|
||||
taosBlockSIGPIPE();
|
||||
taosResolveCRC();
|
||||
|
||||
if (rpcInit() != 0) {
|
||||
dError("failed to init rpc since %s", terrstr());
|
||||
dndCleanup();
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walInit() != 0) {
|
||||
dError("failed to init wal since %s", terrstr());
|
||||
dndCleanup();
|
||||
return -1;
|
||||
}
|
||||
|
||||
SVnodeOpt vnodeOpt = {
|
||||
.sver = pCfg->sver,
|
||||
.timezone = pCfg->timezone,
|
||||
.locale = pCfg->locale,
|
||||
.charset = pCfg->charset,
|
||||
.nthreads = pCfg->numOfCommitThreads,
|
||||
.putReqToVQueryQFp = dndPutReqToVQueryQ,
|
||||
};
|
||||
|
||||
if (vnodeInit(&vnodeOpt) != 0) {
|
||||
dError("failed to init vnode since %s", terrstr());
|
||||
dndCleanup();
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memcpy(&dndEnv.cfg, pCfg, sizeof(SDnodeEnvCfg));
|
||||
dInfo("dnode env is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dndCleanup() {
|
||||
if (atomic_val_compare_exchange_8(&dndEnv.once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
|
||||
dError("dnode env is already cleaned up");
|
||||
return;
|
||||
}
|
||||
|
||||
walCleanUp();
|
||||
vnodeCleanup();
|
||||
rpcCleanup();
|
||||
|
||||
dndCleanupEnv(pDnode);
|
||||
free(pDnode);
|
||||
dInfo("TDengine is cleaned up successfully");
|
||||
taosStopCacheRefreshWorker();
|
||||
dInfo("dnode env is cleaned up");
|
||||
}
|
|
@ -86,7 +86,7 @@ void dndSendRedirectRsp(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
dDebug("RPC %p, req:%s is redirected, num:%d use:%d", pReq->handle, TMSG_INFO(msgType), epSet.numOfEps, epSet.inUse);
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
dDebug("mnode index:%d %s:%u", i, epSet.fqdn[i], epSet.port[i]);
|
||||
if (strcmp(epSet.fqdn[i], pDnode->opt.localFqdn) == 0 && epSet.port[i] == pDnode->opt.serverPort) {
|
||||
if (strcmp(epSet.fqdn[i], pDnode->cfg.localFqdn) == 0 && epSet.port[i] == pDnode->cfg.serverPort) {
|
||||
epSet.inUse = (i + 1) % epSet.numOfEps;
|
||||
}
|
||||
|
||||
|
@ -289,8 +289,8 @@ PRASE_DNODE_OVER:
|
|||
if (root != NULL) cJSON_Delete(root);
|
||||
if (fp != NULL) fclose(fp);
|
||||
|
||||
if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->opt.localEp)) {
|
||||
dError("localEp %s different with %s and need reconfigured", pDnode->opt.localEp, pMgmt->file);
|
||||
if (dndIsEpChanged(pDnode, pMgmt->dnodeId, pDnode->cfg.localEp)) {
|
||||
dError("localEp %s different with %s and need reconfigured", pDnode->cfg.localEp, pMgmt->file);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -298,7 +298,7 @@ PRASE_DNODE_OVER:
|
|||
pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp));
|
||||
pMgmt->dnodeEps->num = 1;
|
||||
pMgmt->dnodeEps->eps[0].isMnode = 1;
|
||||
taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
|
||||
taosGetFqdnPortFromEp(pDnode->cfg.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port);
|
||||
}
|
||||
|
||||
dndResetDnodes(pDnode, pMgmt->dnodeEps);
|
||||
|
@ -362,24 +362,24 @@ void dndSendStatusReq(SDnode *pDnode) {
|
|||
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
pStatus->sver = htonl(pDnode->opt.sver);
|
||||
pStatus->sver = htonl(pDnode->env.sver);
|
||||
pStatus->dver = htobe64(pMgmt->dver);
|
||||
pStatus->dnodeId = htonl(pMgmt->dnodeId);
|
||||
pStatus->clusterId = htobe64(pMgmt->clusterId);
|
||||
pStatus->rebootTime = htobe64(pMgmt->rebootTime);
|
||||
pStatus->updateTime = htobe64(pMgmt->updateTime);
|
||||
pStatus->numOfCores = htonl(pDnode->opt.numOfCores);
|
||||
pStatus->numOfSupportVnodes = htonl(pDnode->opt.numOfSupportVnodes);
|
||||
tstrncpy(pStatus->dnodeEp, pDnode->opt.localEp, TSDB_EP_LEN);
|
||||
pStatus->numOfCores = htonl(pDnode->env.numOfCores);
|
||||
pStatus->numOfSupportVnodes = htonl(pDnode->cfg.numOfSupportVnodes);
|
||||
tstrncpy(pStatus->dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
|
||||
|
||||
pStatus->clusterCfg.statusInterval = htonl(pDnode->opt.statusInterval);
|
||||
pStatus->clusterCfg.statusInterval = htonl(pDnode->cfg.statusInterval);
|
||||
pStatus->clusterCfg.checkTime = 0;
|
||||
char timestr[32] = "1970-01-01 00:00:00.00";
|
||||
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
||||
pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime);
|
||||
tstrncpy(pStatus->clusterCfg.timezone, pDnode->opt.timezone, TSDB_TIMEZONE_LEN);
|
||||
tstrncpy(pStatus->clusterCfg.locale, pDnode->opt.locale, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pStatus->clusterCfg.charset, pDnode->opt.charset, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pStatus->clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN);
|
||||
tstrncpy(pStatus->clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pStatus->clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN);
|
||||
taosRUnLockLatch(&pMgmt->latch);
|
||||
|
||||
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
|
||||
|
@ -485,7 +485,7 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
|||
static void *dnodeThreadRoutine(void *param) {
|
||||
SDnode *pDnode = param;
|
||||
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
|
||||
int32_t ms = pDnode->opt.statusInterval * 1000;
|
||||
int32_t ms = pDnode->cfg.statusInterval * 1000;
|
||||
|
||||
while (true) {
|
||||
pthread_testcancel();
|
||||
|
|
|
@ -247,7 +247,7 @@ static bool dndNeedDeployMnode(SDnode *pDnode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (strcmp(pDnode->opt.localEp, pDnode->opt.firstEp) != 0) {
|
||||
if (strcmp(pDnode->cfg.localEp, pDnode->cfg.firstEp) != 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -266,15 +266,15 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
pOption->putReqToMWriteQFp = dndPutMsgToMWriteQ;
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
pOption->cfg.sver = pDnode->opt.sver;
|
||||
pOption->cfg.enableTelem = pDnode->opt.enableTelem;
|
||||
pOption->cfg.statusInterval = pDnode->opt.statusInterval;
|
||||
pOption->cfg.shellActivityTimer = pDnode->opt.shellActivityTimer;
|
||||
pOption->cfg.timezone = pDnode->opt.timezone;
|
||||
pOption->cfg.charset = pDnode->opt.charset;
|
||||
pOption->cfg.locale = pDnode->opt.locale;
|
||||
pOption->cfg.gitinfo = pDnode->opt.gitinfo;
|
||||
pOption->cfg.buildinfo = pDnode->opt.buildinfo;
|
||||
pOption->cfg.sver = pDnode->env.sver;
|
||||
pOption->cfg.enableTelem = pDnode->env.enableTelem;
|
||||
pOption->cfg.statusInterval = pDnode->cfg.statusInterval;
|
||||
pOption->cfg.shellActivityTimer = pDnode->cfg.shellActivityTimer;
|
||||
pOption->cfg.timezone = pDnode->env.timezone;
|
||||
pOption->cfg.charset = pDnode->env.charset;
|
||||
pOption->cfg.locale = pDnode->env.locale;
|
||||
pOption->cfg.gitinfo = pDnode->env.gitinfo;
|
||||
pOption->cfg.buildinfo = pDnode->env.buildinfo;
|
||||
}
|
||||
|
||||
static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
||||
|
@ -283,8 +283,8 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) {
|
|||
pOption->selfIndex = 0;
|
||||
SReplica *pReplica = &pOption->replicas[0];
|
||||
pReplica->id = 1;
|
||||
pReplica->port = pDnode->opt.serverPort;
|
||||
memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN);
|
||||
pReplica->port = pDnode->cfg.serverPort;
|
||||
memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN);
|
||||
|
||||
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
|
||||
pMgmt->selfIndex = pOption->selfIndex;
|
||||
|
|
|
@ -185,7 +185,7 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
|
|||
pOption->sendRedirectRspFp = dndSendRedirectRsp;
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
pOption->cfg.sver = pDnode->opt.sver;
|
||||
pOption->sver = pDnode->env.sver;
|
||||
}
|
||||
|
||||
static int32_t dndOpenQnode(SDnode *pDnode) {
|
||||
|
|
|
@ -179,7 +179,7 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
|
|||
pOption->sendRedirectRspFp = dndSendRedirectRsp;
|
||||
pOption->dnodeId = dndGetDnodeId(pDnode);
|
||||
pOption->clusterId = dndGetClusterId(pDnode);
|
||||
pOption->cfg.sver = pDnode->opt.sver;
|
||||
pOption->sver = pDnode->env.sver;
|
||||
}
|
||||
|
||||
static int32_t dndOpenSnode(SDnode *pDnode) {
|
||||
|
|
|
@ -176,7 +176,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
|
|||
rpcInit.cfp = dndProcessResponse;
|
||||
rpcInit.sessions = 1024;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
|
||||
rpcInit.idleTime = pDnode->cfg.shellActivityTimer * 1000;
|
||||
rpcInit.user = INTERNAL_USER;
|
||||
rpcInit.ckey = INTERNAL_CKEY;
|
||||
rpcInit.secret = INTERNAL_SECRET;
|
||||
|
@ -325,20 +325,20 @@ static int32_t dndInitServer(SDnode *pDnode) {
|
|||
STransMgmt *pMgmt = &pDnode->tmgmt;
|
||||
dndInitMsgFp(pMgmt);
|
||||
|
||||
int32_t numOfThreads = (int32_t)((pDnode->opt.numOfCores * pDnode->opt.numOfThreadsPerCore) / 2.0);
|
||||
int32_t numOfThreads = (int32_t)((pDnode->env.numOfCores * pDnode->cfg.numOfThreadsPerCore) / 2.0);
|
||||
if (numOfThreads < 1) {
|
||||
numOfThreads = 1;
|
||||
}
|
||||
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = pDnode->opt.serverPort;
|
||||
rpcInit.localPort = pDnode->cfg.serverPort;
|
||||
rpcInit.label = "DND-S";
|
||||
rpcInit.numOfThreads = numOfThreads;
|
||||
rpcInit.cfp = dndProcessRequest;
|
||||
rpcInit.sessions = pDnode->opt.maxShellConns;
|
||||
rpcInit.sessions = pDnode->cfg.maxShellConns;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = pDnode->opt.shellActivityTimer * 1000;
|
||||
rpcInit.idleTime = pDnode->cfg.shellActivityTimer * 1000;
|
||||
rpcInit.afp = dndRetrieveUserAuthInfo;
|
||||
rpcInit.parent = pDnode;
|
||||
|
||||
|
|
|
@ -420,7 +420,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
|
|||
|
||||
pMgmt->totalVnodes = numOfVnodes;
|
||||
|
||||
int32_t threadNum = pDnode->opt.numOfCores;
|
||||
int32_t threadNum = pDnode->env.numOfCores;
|
||||
int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
|
||||
|
||||
SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread));
|
||||
|
@ -904,11 +904,11 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
|
|||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||
|
||||
int32_t maxFetchThreads = 4;
|
||||
int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores);
|
||||
int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1);
|
||||
int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->env.numOfCores);
|
||||
int32_t minQueryThreads = MAX((int32_t)(pDnode->env.numOfCores * pDnode->cfg.ratioOfQueryCores), 1);
|
||||
int32_t maxQueryThreads = minQueryThreads;
|
||||
int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1);
|
||||
int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1);
|
||||
int32_t maxWriteThreads = MAX(pDnode->env.numOfCores, 1);
|
||||
int32_t maxSyncThreads = MAX(pDnode->env.numOfCores / 2, 1);
|
||||
|
||||
SWorkerPool *pPool = &pMgmt->queryPool;
|
||||
pPool->name = "vnode-query";
|
||||
|
|
|
@ -24,7 +24,7 @@ class TestServer {
|
|||
bool DoStart();
|
||||
|
||||
private:
|
||||
SDnodeOpt BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
|
||||
SDnodeObjCfg BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp);
|
||||
|
||||
private:
|
||||
SDnode* pDnode;
|
||||
|
|
|
@ -22,30 +22,27 @@ void* serverLoop(void* param) {
|
|||
}
|
||||
}
|
||||
|
||||
SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
|
||||
SDnodeOpt option = {0};
|
||||
option.sver = 1;
|
||||
option.numOfCores = 1;
|
||||
option.numOfSupportVnodes = 16;
|
||||
option.numOfCommitThreads = 1;
|
||||
option.statusInterval = 1;
|
||||
option.numOfThreadsPerCore = 1;
|
||||
option.ratioOfQueryCores = 1;
|
||||
option.maxShellConns = 1000;
|
||||
option.shellActivityTimer = 30;
|
||||
option.serverPort = port;
|
||||
strcpy(option.dataDir, path);
|
||||
snprintf(option.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
|
||||
snprintf(option.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
|
||||
snprintf(option.firstEp, TSDB_EP_LEN, "%s", firstEp);
|
||||
return option;
|
||||
SDnodeObjCfg TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
|
||||
SDnodeObjCfg cfg = {0};
|
||||
cfg.numOfSupportVnodes = 16;
|
||||
cfg.statusInterval = 1;
|
||||
cfg.numOfThreadsPerCore = 1;
|
||||
cfg.ratioOfQueryCores = 1;
|
||||
cfg.maxShellConns = 1000;
|
||||
cfg.shellActivityTimer = 30;
|
||||
cfg.serverPort = port;
|
||||
strcpy(cfg.dataDir, path);
|
||||
snprintf(cfg.localEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
|
||||
snprintf(cfg.localFqdn, TSDB_FQDN_LEN, "%s", fqdn);
|
||||
snprintf(cfg.firstEp, TSDB_EP_LEN, "%s", firstEp);
|
||||
return cfg;
|
||||
}
|
||||
|
||||
bool TestServer::DoStart() {
|
||||
SDnodeOpt option = BuildOption(path, fqdn, port, firstEp);
|
||||
SDnodeObjCfg cfg = BuildOption(path, fqdn, port, firstEp);
|
||||
taosMkDir(path);
|
||||
|
||||
pDnode = dndInit(&option);
|
||||
pDnode = dndCreate(&cfg);
|
||||
if (pDnode != NULL) {
|
||||
return false;
|
||||
}
|
||||
|
@ -81,7 +78,7 @@ void TestServer::Stop() {
|
|||
}
|
||||
|
||||
if (pDnode != NULL) {
|
||||
dndCleanup(pDnode);
|
||||
dndClose(pDnode);
|
||||
pDnode = NULL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,11 @@ void Testbase::InitLog(const char* path) {
|
|||
}
|
||||
|
||||
void Testbase::Init(const char* path, int16_t port) {
|
||||
SDnodeEnvCfg cfg = {0};
|
||||
cfg.numOfCommitThreads = 1;
|
||||
cfg.numOfCores = 1;
|
||||
dndInit(&cfg);
|
||||
|
||||
char fqdn[] = "localhost";
|
||||
char firstEp[TSDB_EP_LEN] = {0};
|
||||
snprintf(firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port);
|
||||
|
@ -56,6 +61,7 @@ void Testbase::Init(const char* path, int16_t port) {
|
|||
void Testbase::Cleanup() {
|
||||
server.Stop();
|
||||
client.Cleanup();
|
||||
dndCleanup();
|
||||
}
|
||||
|
||||
void Testbase::Restart() { server.Restart(); }
|
||||
|
|
|
@ -29,12 +29,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SQnode {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
SQnodeCfg cfg;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
SendRedirectRspFp sendRedirectRspFp;
|
||||
SQnodeOpt opt;
|
||||
} SQnode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -29,12 +29,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct SSnode {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
SSnodeCfg cfg;
|
||||
SendReqToDnodeFp sendReqToDnodeFp;
|
||||
SendReqToMnodeFp sendReqToMnodeFp;
|
||||
SendRedirectRspFp sendRedirectRspFp;
|
||||
SSnodeOpt cfg;
|
||||
} SSnode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -56,13 +56,11 @@ int vnodeInit(const SVnodeOpt *pOption) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vnodeClear() {
|
||||
void vnodeCleanup() {
|
||||
if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_UNINITIALIZED) {
|
||||
return;
|
||||
}
|
||||
|
||||
walCleanUp();
|
||||
|
||||
// Stop commit handler
|
||||
pthread_mutex_lock(&(vnodeMgr.mutex));
|
||||
vnodeMgr.stop = true;
|
||||
|
|
|
@ -227,7 +227,7 @@ TEST(vnodeApiTest, vnode_simple_create_table_test) {
|
|||
|
||||
// CLOSE THE VNODE
|
||||
vnodeClose(pVnode);
|
||||
vnodeClear();
|
||||
vnodeCleanup();
|
||||
|
||||
taosArrayDestroy(pMsgArr);
|
||||
}
|
||||
|
@ -279,7 +279,7 @@ TEST(vnodeApiTest, vnode_simple_insert_test) {
|
|||
|
||||
// Close the vnode
|
||||
vnodeClose(pVnode);
|
||||
vnodeClear();
|
||||
vnodeCleanup();
|
||||
|
||||
taosArrayDestroy(pMsgArr);
|
||||
}
|
|
@ -85,19 +85,19 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
|||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (pfs->map == NULL) {
|
||||
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
|
||||
tfsDestroy();
|
||||
tfsCleanup();
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int idisk = 0; idisk < ndisk; idisk++) {
|
||||
if (tfsMount(pDiskCfg + idisk) < 0) {
|
||||
tfsDestroy();
|
||||
tfsCleanup();
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tfsCheck() < 0) {
|
||||
tfsDestroy();
|
||||
tfsCleanup();
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -109,7 +109,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tfsDestroy() {
|
||||
void tfsCleanup() {
|
||||
taosHashCleanup(pfs->map);
|
||||
pfs->map = NULL;
|
||||
|
||||
|
|
|
@ -80,6 +80,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization")
|
||||
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
|
||||
|
|
|
@ -49,7 +49,7 @@ typedef struct {
|
|||
} STierMeta;
|
||||
|
||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
||||
void tfsDestroy();
|
||||
void tfsCleanup();
|
||||
void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
|
||||
void tfsGetMeta(SFSMeta *pMeta);
|
||||
void tfsAllocDisk(int expLevel, int *level, int *id);
|
||||
|
|
Loading…
Reference in New Issue