From 111bce9eab8695a1aa87c2ce1147a2c7224a750d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 9 Oct 2021 21:26:09 +0800 Subject: [PATCH] [TD-10430] add global variables in dnode --- include/server/mnode/mnode.h | 2 +- include/server/vnode/vnode.h | 2 +- include/util/tstep.h | 6 +- include/util/tworker.h | 56 +++-- source/server/dnode/inc/dnodeCfg.h | 21 +- source/server/dnode/inc/dnodeCheck.h | 6 +- source/server/dnode/inc/dnodeEps.h | 18 +- source/server/dnode/inc/dnodeInt.h | 26 +-- source/server/dnode/inc/dnodeMain.h | 13 +- source/server/dnode/inc/dnodeMnodeEps.h | 19 +- source/server/dnode/inc/dnodeStatus.h | 10 +- source/server/dnode/inc/dnodeTelem.h | 17 +- source/server/dnode/inc/dnodeTrans.h | 16 +- source/server/dnode/src/dnodeCfg.c | 129 ++++++------ source/server/dnode/src/dnodeCheck.c | 16 +- source/server/dnode/src/dnodeEps.c | 171 +++++++-------- source/server/dnode/src/dnodeInt.c | 67 +++--- source/server/dnode/src/dnodeMain.c | 83 ++++---- source/server/dnode/src/dnodeMnodeEps.c | 151 +++++++------ source/server/dnode/src/dnodeStatus.c | 70 +++---- source/server/dnode/src/dnodeTelem.c | 100 ++++----- source/server/dnode/src/dnodeTrans.c | 268 ++++++++++++------------ source/util/src/tstep.c | 15 +- source/util/src/tworker.c | 227 ++++++++++++++++---- 24 files changed, 768 insertions(+), 741 deletions(-) diff --git a/include/server/mnode/mnode.h b/include/server/mnode/mnode.h index 7ea50a92ce..e78994fc9c 100644 --- a/include/server/mnode/mnode.h +++ b/include/server/mnode/mnode.h @@ -118,7 +118,7 @@ typedef struct { int32_t mnodeGetStatistics(SMnodeStat *stat); /** - * Get the auth information. + * Get the statistical information of Mnode. * * @param user, username. * @param spi, security parameter index. diff --git a/include/server/vnode/vnode.h b/include/server/vnode/vnode.h index 8fd4fd433f..00decfe338 100644 --- a/include/server/vnode/vnode.h +++ b/include/server/vnode/vnode.h @@ -49,7 +49,7 @@ typedef struct { } SVnodeFp; typedef struct { - SVnodeFp fp; + SVnodeFp fp; } SVnodePara; /** diff --git a/include/util/tstep.h b/include/util/tstep.h index 87e95edd97..ffc3f6ccf7 100644 --- a/include/util/tstep.h +++ b/include/util/tstep.h @@ -20,14 +20,14 @@ extern "C" { #endif -typedef int32_t (*InitFp)(void **obj); -typedef void (*CleanupFp)(void **obj); +typedef int32_t (*InitFp)(); +typedef void (*CleanupFp)(); typedef void (*ReportFp)(char *name, char *desc); struct SSteps *taosStepInit(int32_t maxsize, ReportFp fp); int32_t taosStepExec(struct SSteps *steps); void taosStepCleanup(struct SSteps *steps); -int32_t taosStepAdd(struct SSteps *steps, char *name, void **obj, InitFp initFp, CleanupFp cleanupFp); +int32_t taosStepAdd(struct SSteps *steps, char *name, InitFp initFp, CleanupFp cleanupFp); #ifdef __cplusplus } diff --git a/include/util/tworker.h b/include/util/tworker.h index 156ced383e..fbe2fc26f2 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -13,21 +13,23 @@ * along with this program. If not, see . */ -#ifndef _TD_UTIL_WORKER_H -#define _TD_UTIL_WORKER_H +#ifndef TDENGINE_TWORKER_H +#define TDENGINE_TWORKER_H #ifdef __cplusplus extern "C" { #endif -typedef int32_t (*ProcessReqFp)(void *ahandle, void *msg); -typedef void (*SendRspFp)(void *ahandle, void *msg, int32_t qtype, int32_t code); +typedef int32_t (*ProcessStartFp)(void *ahandle, void *pMsg, int32_t qtype); +typedef void (*ProcessEndFp)(void *ahandle, void *pMsg, int32_t qtype, int32_t code); -struct SWorkerPool; +typedef bool (*ProcessWriteStartFp)(void *ahandle, void *pMsg, int32_t qtype); +typedef void (*ProcessWriteSyncFp)(void *ahandle, int32_t code); +typedef void (*ProcessWriteEndFp)(void *ahandle, void *pMsg, int32_t qtype); -typedef struct { - pthread_t thread; // thread +typedef struct SWorker { int32_t id; // worker ID + pthread_t thread; // thread struct SWorkerPool *pool; } SWorker; @@ -35,21 +37,45 @@ typedef struct SWorkerPool { int32_t max; // max number of workers int32_t min; // min number of workers int32_t num; // current number of workers - void * qset; + taos_qset qset; const char * name; + ProcessStartFp startFp; + ProcessEndFp endFp; SWorker * workers; - ProcessReqFp reqFp; - SendRspFp rspFp; pthread_mutex_t mutex; } SWorkerPool; -int32_t tWorkerInit(SWorkerPool *pPool); -void tWorkerCleanup(SWorkerPool *pPool); -void * tWorkerAllocQueue(SWorkerPool *pPool, void *ahandle); -void tWorkerFreeQueue(SWorkerPool *pPool, void *pQueue); +typedef struct SWriteWorker { + int32_t id; // worker id + pthread_t thread; // thread + taos_qall qall; + taos_qset qset; // queue set + struct SWriteWorkerPool *pool; +} SWriteWorker; + +typedef struct SWriteWorkerPool { + int32_t max; // max number of workers + int32_t nextId; // from 0 to max-1, cyclic + const char * name; + ProcessWriteStartFp startFp; + ProcessWriteSyncFp syncFp; + ProcessWriteEndFp endFp; + SWriteWorker * workers; + pthread_mutex_t mutex; +} SWriteWorkerPool; + +int32_t tWorkerInit(SWorkerPool *pool); +void tWorkerCleanup(SWorkerPool *pool); +taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle); +void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue); + +int32_t tWriteWorkerInit(SWriteWorkerPool *pool); +void tWriteWorkerCleanup(SWriteWorkerPool *pool); +taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle); +void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue); #ifdef __cplusplus } #endif -#endif /*_TD_UTIL_WORKER_H*/ +#endif diff --git a/source/server/dnode/inc/dnodeCfg.h b/source/server/dnode/inc/dnodeCfg.h index 1565e7649a..eda6231579 100644 --- a/source/server/dnode/inc/dnodeCfg.h +++ b/source/server/dnode/inc/dnodeCfg.h @@ -21,21 +21,14 @@ extern "C" { #endif #include "dnodeInt.h" -typedef struct SDnCfg { - int32_t dnodeId; - int32_t dropped; - char clusterId[TSDB_CLUSTER_ID_LEN]; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} SDnCfg; -int32_t dnodeInitCfg(SDnCfg **cfg); -void dnodeCleanupCfg(SDnCfg **cfg); -void dnodeUpdateCfg(SDnCfg *cfg, SDnodeCfg *data); -int32_t dnodeGetDnodeId(SDnCfg *cfg); -void dnodeGetClusterId(SDnCfg *cfg, char *clusterId); -void dnodeGetCfg(SDnCfg *cfg, int32_t *dnodeId, char *clusterId); -void dnodeSetDropped(SDnCfg *cfg); +int32_t dnodeInitCfg(); +void dnodeCleanupCfg(); +void dnodeUpdateCfg(SDnodeCfg *data); +int32_t dnodeGetDnodeId(); +void dnodeGetClusterId(char *clusterId); +void dnodeGetCfg(int32_t *dnodeId, char *clusterId); +void dnodeSetDropped(); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeCheck.h b/source/server/dnode/inc/dnodeCheck.h index 29172ba4df..b6fbf1eabd 100644 --- a/source/server/dnode/inc/dnodeCheck.h +++ b/source/server/dnode/inc/dnodeCheck.h @@ -21,11 +21,9 @@ extern "C" { #endif #include "dnodeInt.h" -typedef struct SDnCheck { -} SDnCheck; -int32_t dnodeInitCheck(SDnCheck **check); -void dnodeCleanupCheck(SDnCheck **check); +int32_t dnodeInitCheck(); +void dnodeCleanupCheck(); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeEps.h b/source/server/dnode/inc/dnodeEps.h index 8019a81933..4f25884021 100644 --- a/source/server/dnode/inc/dnodeEps.h +++ b/source/server/dnode/inc/dnodeEps.h @@ -19,22 +19,12 @@ #ifdef __cplusplus extern "C" { #endif -#include "thash.h" #include "dnodeInt.h" -typedef struct SDnEps { - int32_t dnodeId; - int32_t dnodeNum; - SDnodeEp * dnodeList; - SHashObj * dnodeHash; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} SDnEps; - -int32_t dnodeInitEps(SDnEps **eps); -void dnodeCleanupEps(SDnEps **eps); -void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data); -bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr); +int32_t dnodeInitEps(); +void dnodeCleanupEps(); +void dnodeUpdateEps(SDnodeEps *data); +bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); #ifdef __cplusplus diff --git a/source/server/dnode/inc/dnodeInt.h b/source/server/dnode/inc/dnodeInt.h index 9b56147ca0..82cdfb52bf 100644 --- a/source/server/dnode/inc/dnodeInt.h +++ b/source/server/dnode/inc/dnodeInt.h @@ -19,36 +19,12 @@ #ifdef __cplusplus extern "C" { #endif -#include "taoserror.h" #include "taosmsg.h" -#include "tglobal.h" #include "tlog.h" #include "trpc.h" -#include "tstep.h" #include "dnode.h" -struct SDnCfg; -struct SDnCheck; -struct SDnEps; -struct SDnMnEps; -struct SDnStatus; -struct SDnTelem; -struct SDnTrans; -struct SDnMain; - -typedef struct SDnode { - struct SSteps* steps; - struct SDnCfg* cfg; - struct SDnCheck* check; - struct SDnEps* eps; - struct SDnMnEps* meps; - struct SDnStatus* status; - struct SDnTelem* telem; - struct SDnTrans* trans; - struct SDnMain* main; -} SDnode; - -SDnode* dnodeInst(); +extern int32_t dDebugFlag; #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} diff --git a/source/server/dnode/inc/dnodeMain.h b/source/server/dnode/inc/dnodeMain.h index 08f6e10830..245ede0001 100644 --- a/source/server/dnode/inc/dnodeMain.h +++ b/source/server/dnode/inc/dnodeMain.h @@ -27,14 +27,8 @@ typedef enum { TD_RUN_STAT_STOPPED } RunStat; -typedef struct SDnMain { - RunStat runStatus; - void * dnodeTimer; - SStartupStep startup; -} SDnMain; - -int32_t dnodeInitMain(SDnMain **main); -void dnodeCleanupMain(SDnMain **main); +int32_t dnodeInitMain(); +void dnodeCleanupMain(); int32_t dnodeInitStorage(); void dnodeCleanupStorage(); void dnodeReportStartup(char *name, char *desc); @@ -42,6 +36,9 @@ void dnodeReportStartupFinished(char *name, char *desc); void dnodeProcessStartupReq(SRpcMsg *pMsg); void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); +RunStat dnodeGetRunStat(); +void dnodeSetRunStat(); +void* dnodeGetTimer(); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeMnodeEps.h b/source/server/dnode/inc/dnodeMnodeEps.h index a70c621046..c890f6921d 100644 --- a/source/server/dnode/inc/dnodeMnodeEps.h +++ b/source/server/dnode/inc/dnodeMnodeEps.h @@ -21,19 +21,12 @@ extern "C" { #endif #include "dnodeInt.h" -typedef struct SDnMnEps { - SRpcEpSet mnodeEpSet; - SMInfos mnodeInfos; - char file[PATH_MAX + 20]; - pthread_mutex_t mutex; -} SDnMnEps; - -int32_t dnodeInitMnodeEps(SDnMnEps **meps); -void dnodeCleanupMnodeEps(SDnMnEps **meps); -void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *pMinfos); -void dnodeUpdateMnodeFromPeer(SDnMnEps *meps, SRpcEpSet *pEpSet); -void dnodeGetEpSetForPeer(SDnMnEps *meps, SRpcEpSet *epSet); -void dnodeGetEpSetForShell(SDnMnEps *meps, SRpcEpSet *epSet); +int32_t dnodeInitMnodeEps(); +void dnodeCleanupMnodeEps(); +void dnodeUpdateMnodeFromStatus(SMInfos *pMinfos); +void dnodeUpdateMnodeFromPeer(SRpcEpSet *pEpSet); +void dnodeGetEpSetForPeer(SRpcEpSet *epSet); +void dnodeGetEpSetForShell(SRpcEpSet *epSet); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); #ifdef __cplusplus diff --git a/source/server/dnode/inc/dnodeStatus.h b/source/server/dnode/inc/dnodeStatus.h index 65a349ba6d..f0473b93f1 100644 --- a/source/server/dnode/inc/dnodeStatus.h +++ b/source/server/dnode/inc/dnodeStatus.h @@ -21,14 +21,8 @@ extern "C" { #endif #include "dnodeInt.h" -typedef struct SDnStatus { - void * dnodeTimer; - void * statusTimer; - uint32_t rebootTime; -} SDnStatus; - -int32_t dnodeInitStatus(SDnStatus **status); -void dnodeCleanupStatus(SDnStatus **status); +int32_t dnodeInitStatus(); +void dnodeCleanupStatus(); void dnodeProcessStatusRsp(SRpcMsg *pMsg); #ifdef __cplusplus diff --git a/source/server/dnode/inc/dnodeTelem.h b/source/server/dnode/inc/dnodeTelem.h index 94356fffba..4945879e64 100644 --- a/source/server/dnode/inc/dnodeTelem.h +++ b/source/server/dnode/inc/dnodeTelem.h @@ -21,21 +21,8 @@ extern "C" { #endif #include "dnodeInt.h" -/* - * sem_timedwait is NOT implemented on MacOSX - * thus we use pthread_mutex_t/pthread_cond_t to simulate - */ -typedef struct SDnTelem { - bool enable; - pthread_mutex_t lock; - pthread_cond_t cond; - volatile int32_t exit; - pthread_t thread; - char email[TSDB_FQDN_LEN]; -} SDnTelem; - -int32_t dnodeInitTelem(SDnTelem **telem); -void dnodeCleanupTelem(SDnTelem **telem); +int32_t dnodeInitTelem(); +void dnodeCleanupTelem(); #ifdef __cplusplus } diff --git a/source/server/dnode/inc/dnodeTrans.h b/source/server/dnode/inc/dnodeTrans.h index d9016f0c7b..631c69d11c 100644 --- a/source/server/dnode/inc/dnodeTrans.h +++ b/source/server/dnode/inc/dnodeTrans.h @@ -21,20 +21,8 @@ extern "C" { #endif #include "dnodeInt.h" -typedef void (*RpcMsgFp)( SRpcMsg *pMsg); - -typedef struct SDnTrans { - void * serverRpc; - void * clientRpc; - void * shellRpc; - int32_t queryReqNum; - int32_t submitReqNum; - RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; - RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX]; -} SDnTrans; - -int32_t dnodeInitTrans(SDnTrans **rans); -void dnodeCleanupTrans(SDnTrans **trans); +int32_t dnodeInitTrans(); +void dnodeCleanupTrans(); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); diff --git a/source/server/dnode/src/dnodeCfg.c b/source/server/dnode/src/dnodeCfg.c index ee8a17db08..f9ed491464 100644 --- a/source/server/dnode/src/dnodeCfg.c +++ b/source/server/dnode/src/dnodeCfg.c @@ -16,56 +16,65 @@ #define _DEFAULT_SOURCE #include "os.h" #include "cJSON.h" +#include "tglobal.h" #include "dnodeCfg.h" -static int32_t dnodeReadCfg(SDnCfg *cfg) { +static struct DnCfg { + int32_t dnodeId; + int32_t dropped; + char clusterId[TSDB_CLUSTER_ID_LEN]; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} tsDcfg; + +static int32_t dnodeReadCfg() { int32_t len = 0; int32_t maxLen = 200; char * content = calloc(1, maxLen + 1); cJSON * root = NULL; FILE * fp = NULL; - fp = fopen(cfg->file, "r"); + fp = fopen(tsDcfg.file, "r"); if (!fp) { - dDebug("file %s not exist", cfg->file); + dDebug("file %s not exist", tsDcfg.file); goto PARSE_CFG_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", cfg->file); + dError("failed to read %s since content is null", tsDcfg.file); goto PARSE_CFG_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", cfg->file); + dError("failed to read %s since invalid json format", tsDcfg.file); goto PARSE_CFG_OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read %s since dnodeId not found", cfg->file); + dError("failed to read %s since dnodeId not found", tsDcfg.file); goto PARSE_CFG_OVER; } - cfg->dnodeId = (int32_t)dnodeId->valueint; + tsDcfg.dnodeId = (int32_t)dnodeId->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", cfg->file); + dError("failed to read %s since dropped not found", tsDcfg.file); goto PARSE_CFG_OVER; } - cfg->dropped = (int32_t)dropped->valueint; + tsDcfg.dropped = (int32_t)dropped->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { - dError("failed to read %s since clusterId not found", cfg->file); + dError("failed to read %s since clusterId not found", tsDcfg.file); goto PARSE_CFG_OVER; } - tstrncpy(cfg->clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN); + tstrncpy(tsDcfg.clusterId, clusterId->valuestring, TSDB_CLUSTER_ID_LEN); - dInfo("successed to read %s", cfg->file); + dInfo("successed to read %s", tsDcfg.file); PARSE_CFG_OVER: if (content != NULL) free(content); @@ -76,10 +85,10 @@ PARSE_CFG_OVER: return 0; } -static int32_t dnodeWriteCfg(SDnCfg *cfg) { - FILE *fp = fopen(cfg->file, "w"); +static int32_t dnodeWriteCfg() { + FILE *fp = fopen(tsDcfg.file, "w"); if (!fp) { - dError("failed to write %s since %s", cfg->file, strerror(errno)); + dError("failed to write %s since %s", tsDcfg.file, strerror(errno)); return -1; } @@ -88,9 +97,9 @@ static int32_t dnodeWriteCfg(SDnCfg *cfg) { char * content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", cfg->dnodeId); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", cfg->dropped); - len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", cfg->clusterId); + len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", tsDcfg.dnodeId); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", tsDcfg.dropped); + len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%s\"\n", tsDcfg.clusterId); len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); @@ -99,27 +108,23 @@ static int32_t dnodeWriteCfg(SDnCfg *cfg) { free(content); terrno = 0; - dInfo("successed to write %s", cfg->file); + dInfo("successed to write %s", tsDcfg.file); return 0; } -int32_t dnodeInitCfg(SDnCfg **out) { - SDnCfg* cfg = calloc(1, sizeof(SDnCfg)); - if (cfg == NULL) return -1; - - cfg->dnodeId = 0; - cfg->dropped = 0; - cfg->clusterId[0] = 0; - snprintf(cfg->file, sizeof(cfg->file), "%s/dnodeCfg.json", tsDnodeDir); - pthread_mutex_init(&cfg->mutex, NULL); - *out = cfg; - - int32_t ret = dnodeReadCfg(cfg); +int32_t dnodeInitCfg() { + tsDcfg.dnodeId = 0; + tsDcfg.dropped = 0; + tsDcfg.clusterId[0] = 0; + snprintf(tsDcfg.file, sizeof(tsDcfg.file), "%s/dnodeCfg.json", tsDnodeDir); + pthread_mutex_init(&tsDcfg.mutex, NULL); + + int32_t ret = dnodeReadCfg(); if (ret == 0) { dInfo("dnode cfg is initialized"); } - if (cfg->dropped) { + if (tsDcfg.dropped) { dInfo("dnode is dropped and start to exit"); return -1; } @@ -127,51 +132,47 @@ int32_t dnodeInitCfg(SDnCfg **out) { return ret; } -void dnodeCleanupCfg(SDnCfg **out) { - SDnCfg* cfg = *out; - *out = NULL; - - pthread_mutex_destroy(&cfg->mutex); - free(cfg); +void dnodeCleanupCfg() { + pthread_mutex_destroy(&tsDcfg.mutex); } -void dnodeUpdateCfg(SDnCfg *cfg, SDnodeCfg *data) { - if (cfg == NULL || cfg->dnodeId == 0) return; +void dnodeUpdateCfg(SDnodeCfg *data) { + if (tsDcfg.dnodeId != 0) return; - pthread_mutex_lock(&cfg->mutex); + pthread_mutex_lock(&tsDcfg.mutex); - cfg->dnodeId = data->dnodeId; - tstrncpy(cfg->clusterId, data->clusterId, TSDB_CLUSTER_ID_LEN); - dInfo("dnodeId is set to %d, clusterId is set to %s", cfg->dnodeId, cfg->clusterId); + tsDcfg.dnodeId = data->dnodeId; + tstrncpy(tsDcfg.clusterId, data->clusterId, TSDB_CLUSTER_ID_LEN); + dInfo("dnodeId is set to %d, clusterId is set to %s", data->dnodeId, data->clusterId); - dnodeWriteCfg(cfg); - pthread_mutex_unlock(&cfg->mutex); + dnodeWriteCfg(); + pthread_mutex_unlock(&tsDcfg.mutex); } -void dnodeSetDropped(SDnCfg *cfg) { - pthread_mutex_lock(&cfg->mutex); - cfg->dropped = 1; - dnodeWriteCfg(cfg); - pthread_mutex_unlock(&cfg->mutex); +void dnodeSetDropped() { + pthread_mutex_lock(&tsDcfg.mutex); + tsDcfg.dropped = 1; + dnodeWriteCfg(); + pthread_mutex_unlock(&tsDcfg.mutex); } -int32_t dnodeGetDnodeId(SDnCfg *cfg) { +int32_t dnodeGetDnodeId() { int32_t dnodeId = 0; - pthread_mutex_lock(&cfg->mutex); - dnodeId = cfg->dnodeId; - pthread_mutex_unlock(&cfg->mutex); + pthread_mutex_lock(&tsDcfg.mutex); + dnodeId = tsDcfg.dnodeId; + pthread_mutex_unlock(&tsDcfg.mutex); return dnodeId; } -void dnodeGetClusterId(SDnCfg *cfg, char *clusterId) { - pthread_mutex_lock(&cfg->mutex); - tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); - pthread_mutex_unlock(&cfg->mutex); +void dnodeGetClusterId(char *clusterId) { + pthread_mutex_lock(&tsDcfg.mutex); + tstrncpy(clusterId, tsDcfg.clusterId, TSDB_CLUSTER_ID_LEN); + pthread_mutex_unlock(&tsDcfg.mutex); } -void dnodeGetCfg(SDnCfg *cfg, int32_t *dnodeId, char *clusterId) { - pthread_mutex_lock(&cfg->mutex); - *dnodeId = cfg->dnodeId; - tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); - pthread_mutex_unlock(&cfg->mutex); +void dnodeGetCfg(int32_t *dnodeId, char *clusterId) { + pthread_mutex_lock(&tsDcfg.mutex); + *dnodeId = tsDcfg.dnodeId; + tstrncpy(clusterId, tsDcfg.clusterId, TSDB_CLUSTER_ID_LEN); + pthread_mutex_unlock(&tsDcfg.mutex); } diff --git a/source/server/dnode/src/dnodeCheck.c b/source/server/dnode/src/dnodeCheck.c index 8f561b1ea0..b59e4bd4e0 100644 --- a/source/server/dnode/src/dnodeCheck.c +++ b/source/server/dnode/src/dnodeCheck.c @@ -118,7 +118,7 @@ static int32_t dnodeCheckMem() { } static int32_t dnodeCheckDisk() { -#if 0 +#if 0 taosGetDisk(); if (tsAvailDataDirGB < tsMinimalDataDirGB) { @@ -145,12 +145,7 @@ static int32_t dnodeCheckAccess() { return 0; } static int32_t dnodeCheckVersion() { return 0; } static int32_t dnodeCheckDatafile() { return 0; } -int32_t dnodeInitCheck(SDnCheck **out) { - SDnCheck *check = calloc(1, sizeof(SDnCheck)); - if (check == NULL) return -1; - - *out = check; - +int32_t dnodeInitCheck() { if (dnodeCheckNetwork() != 0) { dError("failed to check network"); return -1; @@ -195,9 +190,4 @@ int32_t dnodeInitCheck(SDnCheck **out) { return 0; } -void dnodeCleanupCheck(SDnCheck **out) { - SDnCheck *check = *out; - *out = NULL; - - free(check); -} \ No newline at end of file +void dnodeCleanupCheck() {} \ No newline at end of file diff --git a/source/server/dnode/src/dnodeEps.c b/source/server/dnode/src/dnodeEps.c index 317a9968fa..d5bb77bde6 100644 --- a/source/server/dnode/src/dnodeEps.c +++ b/source/server/dnode/src/dnodeEps.c @@ -16,86 +16,96 @@ #define _DEFAULT_SOURCE #include "os.h" #include "cJSON.h" +#include "thash.h" #include "tglobal.h" #include "dnodeEps.h" #include "dnodeCfg.h" -static void dnodePrintEps(SDnEps *eps) { - dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum); - for (int32_t i = 0; i < eps->dnodeNum; i++) { - SDnodeEp *ep = &eps->dnodeList[i]; +static struct { + int32_t dnodeId; + int32_t dnodeNum; + SDnodeEp * dnodeList; + SHashObj * dnodeHash; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} tsDeps; + +static void dnodePrintEps() { + dDebug("print dnodeEp, dnodeNum:%d", tsDeps.dnodeNum); + for (int32_t i = 0; i < tsDeps.dnodeNum; i++) { + SDnodeEp *ep = &tsDeps.dnodeList[i]; dDebug("dnode:%d, dnodeFqdn:%s dnodePort:%u", ep->dnodeId, ep->dnodeFqdn, ep->dnodePort); } } -static void dnodeResetEps(SDnEps *eps, SDnodeEps *data) { +static void dnodeResetEps(SDnodeEps *data) { assert(data != NULL); - if (data->dnodeNum > eps->dnodeNum) { + if (data->dnodeNum > tsDeps.dnodeNum) { SDnodeEp *tmp = calloc(data->dnodeNum, sizeof(SDnodeEp)); if (tmp == NULL) return; - tfree(eps->dnodeList); - eps->dnodeList = tmp; - eps->dnodeNum = data->dnodeNum; - memcpy(eps->dnodeList, data->dnodeEps, eps->dnodeNum * sizeof(SDnodeEp)); - dnodePrintEps(eps); + tfree(tsDeps.dnodeList); + tsDeps.dnodeList = tmp; + tsDeps.dnodeNum = data->dnodeNum; + memcpy(tsDeps.dnodeList, data->dnodeEps, tsDeps.dnodeNum * sizeof(SDnodeEp)); + dnodePrintEps(); - for (int32_t i = 0; i < eps->dnodeNum; ++i) { - SDnodeEp *ep = &eps->dnodeList[i]; - taosHashPut(eps->dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); + for (int32_t i = 0; i < tsDeps.dnodeNum; ++i) { + SDnodeEp *ep = &tsDeps.dnodeList[i]; + taosHashPut(tsDeps.dnodeHash, &ep->dnodeId, sizeof(int32_t), ep, sizeof(SDnodeEp)); } } } -static int32_t dnodeReadEps(SDnEps *eps) { +static int32_t dnodeReadEps() { int32_t len = 0; int32_t maxLen = 30000; char * content = calloc(1, maxLen + 1); cJSON * root = NULL; FILE * fp = NULL; - fp = fopen(eps->file, "r"); + fp = fopen(tsDeps.file, "r"); if (!fp) { - dDebug("file %s not exist", eps->file); + dDebug("file %s not exist", tsDeps.file); goto PRASE_EPS_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", eps->file); + dError("failed to read %s since content is null", tsDeps.file); goto PRASE_EPS_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", eps->file); + dError("failed to read %s since invalid json format", tsDeps.file); goto PRASE_EPS_OVER; } cJSON *dnodeNum = cJSON_GetObjectItem(root, "dnodeNum"); if (!dnodeNum || dnodeNum->type != cJSON_Number) { - dError("failed to read %s since dnodeNum not found", eps->file); + dError("failed to read %s since dnodeNum not found", tsDeps.file); goto PRASE_EPS_OVER; } cJSON *dnodeInfos = cJSON_GetObjectItem(root, "dnodeInfos"); if (!dnodeInfos || dnodeInfos->type != cJSON_Array) { - dError("failed to read %s since dnodeInfos not found", eps->file); + dError("failed to read %s since dnodeInfos not found", tsDeps.file); goto PRASE_EPS_OVER; } int32_t dnodeInfosSize = cJSON_GetArraySize(dnodeInfos); if (dnodeInfosSize != dnodeNum->valueint) { - dError("failed to read %s since dnodeInfos size:%d not matched dnodeNum:%d", eps->file, dnodeInfosSize, + dError("failed to read %s since dnodeInfos size:%d not matched dnodeNum:%d", tsDeps.file, dnodeInfosSize, (int32_t)dnodeNum->valueint); goto PRASE_EPS_OVER; } - eps->dnodeNum = dnodeInfosSize; - eps->dnodeList = calloc(dnodeInfosSize, sizeof(SDnodeEp)); - if (eps->dnodeList == NULL) { + tsDeps.dnodeNum = dnodeInfosSize; + tsDeps.dnodeList = calloc(dnodeInfosSize, sizeof(SDnodeEp)); + if (tsDeps.dnodeList == NULL) { dError("failed to calloc dnodeEpList since %s", strerror(errno)); goto PRASE_EPS_OVER; } @@ -104,40 +114,40 @@ static int32_t dnodeReadEps(SDnEps *eps) { cJSON *dnodeInfo = cJSON_GetArrayItem(dnodeInfos, i); if (dnodeInfo == NULL) break; - SDnodeEp *ep = &eps->dnodeList[i]; + SDnodeEp *ep = &tsDeps.dnodeList[i]; cJSON *dnodeId = cJSON_GetObjectItem(dnodeInfo, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_Number) { - dError("failed to read %s, dnodeId not found", eps->file); + dError("failed to read %s, dnodeId not found", tsDeps.file); goto PRASE_EPS_OVER; } ep->dnodeId = (int32_t)dnodeId->valueint; cJSON *dnodeFqdn = cJSON_GetObjectItem(dnodeInfo, "dnodeFqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { - dError("failed to read %s, dnodeFqdn not found", eps->file); + dError("failed to read %s, dnodeFqdn not found", tsDeps.file); goto PRASE_EPS_OVER; } tstrncpy(ep->dnodeFqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(dnodeInfo, "dnodePort"); if (!dnodePort || dnodePort->type != cJSON_Number) { - dError("failed to read %s, dnodePort not found", eps->file); + dError("failed to read %s, dnodePort not found", tsDeps.file); goto PRASE_EPS_OVER; } ep->dnodePort = (uint16_t)dnodePort->valueint; } - dInfo("succcessed to read file %s", eps->file); - dnodePrintEps(eps); + dInfo("succcessed to read file %s", tsDeps.file); + dnodePrintEps(); PRASE_EPS_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); - if (dnodeIsDnodeEpChanged(eps, eps->dnodeId, tsLocalEp)) { - dError("dnode:%d, localEp different from %s dnodeEps.json and need reconfigured", eps->dnodeId, tsLocalEp); + if (dnodeIsDnodeEpChanged(tsDeps.dnodeId, tsLocalEp)) { + dError("dnode:%d, localEp different from %s dnodeEps.json and need reconfigured", tsDeps.dnodeId, tsLocalEp); return -1; } @@ -145,10 +155,10 @@ PRASE_EPS_OVER: return 0; } -static int32_t dnodeWriteEps(SDnEps *eps) { - FILE *fp = fopen(eps->file, "w"); +static int32_t dnodeWriteEps() { + FILE *fp = fopen(tsDeps.file, "w"); if (!fp) { - dError("failed to write %s since %s", eps->file, strerror(errno)); + dError("failed to write %s since %s", tsDeps.file, strerror(errno)); return -1; } @@ -157,14 +167,14 @@ static int32_t dnodeWriteEps(SDnEps *eps) { char * content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", eps->dnodeNum); + len += snprintf(content + len, maxLen - len, " \"dnodeNum\": %d,\n", tsDeps.dnodeNum); len += snprintf(content + len, maxLen - len, " \"dnodeInfos\": [{\n"); - for (int32_t i = 0; i < eps->dnodeNum; ++i) { - SDnodeEp *ep = &eps->dnodeList[i]; + for (int32_t i = 0; i < tsDeps.dnodeNum; ++i) { + SDnodeEp *ep = &tsDeps.dnodeList[i]; len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", ep->dnodeId); len += snprintf(content + len, maxLen - len, " \"dnodeFqdn\": \"%s\",\n", ep->dnodeFqdn); len += snprintf(content + len, maxLen - len, " \"dnodePort\": %u\n", ep->dnodePort); - if (i < eps->dnodeNum - 1) { + if (i < tsDeps.dnodeNum - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -178,24 +188,20 @@ static int32_t dnodeWriteEps(SDnEps *eps) { free(content); terrno = 0; - dInfo("successed to write %s", eps->file); + dInfo("successed to write %s", tsDeps.file); return 0; } -int32_t dnodeInitEps(SDnEps **out) { - SDnEps *eps = calloc(1, sizeof(SDnEps)); - if (eps == NULL) return -1; +int32_t dnodeInitEps() { + tsDeps.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (tsDeps.dnodeHash == NULL) return -1; - eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (eps->dnodeHash == NULL) return -1; + tsDeps.dnodeId = dnodeGetDnodeId(); + tsDeps.dnodeNum = 0; + snprintf(tsDeps.file, sizeof(tsDeps.file), "%s/dnodeEps.json", tsDnodeDir); + pthread_mutex_init(&tsDeps.mutex, NULL); - eps->dnodeId = dnodeInst()->cfg->dnodeId; - eps->dnodeNum = 0; - snprintf(eps->file, sizeof(eps->file), "%s/dnodeEps.json", tsDnodeDir); - pthread_mutex_init(&eps->mutex, NULL); - *out = eps; - - int32_t ret = dnodeReadEps(eps); + int32_t ret = dnodeReadEps(); if (ret == 0) { dInfo("dnode eps is initialized"); } @@ -203,29 +209,25 @@ int32_t dnodeInitEps(SDnEps **out) { return ret; } -void dnodeCleanupEps(SDnEps **out) { - SDnEps *eps = *out; - *out = NULL; +void dnodeCleanupEps() { + pthread_mutex_lock(&tsDeps.mutex); - pthread_mutex_lock(&eps->mutex); - - if (eps->dnodeList != NULL) { - free(eps->dnodeList); - eps->dnodeList = NULL; + if (tsDeps.dnodeList != NULL) { + free(tsDeps.dnodeList); + tsDeps.dnodeList = NULL; } - if (eps->dnodeHash) { - taosHashCleanup(eps->dnodeHash); - eps->dnodeHash = NULL; + if (tsDeps.dnodeHash) { + taosHashCleanup(tsDeps.dnodeHash); + tsDeps.dnodeHash = NULL; } - eps->dnodeNum = 0; - pthread_mutex_unlock(&eps->mutex); - pthread_mutex_destroy(&eps->mutex); - free(eps); + tsDeps.dnodeNum = 0; + pthread_mutex_unlock(&tsDeps.mutex); + pthread_mutex_destroy(&tsDeps.mutex); } -void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data) { +void dnodeUpdateEps(SDnodeEps *data) { if (data == NULL || data->dnodeNum <= 0) return; data->dnodeNum = htonl(data->dnodeNum); @@ -234,28 +236,28 @@ void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data) { data->dnodeEps[i].dnodePort = htons(data->dnodeEps[i].dnodePort); } - pthread_mutex_lock(&eps->mutex); + pthread_mutex_lock(&tsDeps.mutex); - if (data->dnodeNum != eps->dnodeNum) { - dnodeResetEps(eps, data); - dnodeWriteEps(eps); + if (data->dnodeNum != tsDeps.dnodeNum) { + dnodeResetEps(data); + dnodeWriteEps(); } else { int32_t size = data->dnodeNum * sizeof(SDnodeEp); - if (memcmp(eps->dnodeList, data->dnodeEps, size) != 0) { - dnodeResetEps(eps, data); - dnodeWriteEps(eps); + if (memcmp(tsDeps.dnodeList, data->dnodeEps, size) != 0) { + dnodeResetEps(data); + dnodeWriteEps(); } } - pthread_mutex_unlock(&eps->mutex); + pthread_mutex_unlock(&tsDeps.mutex); } -bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr) { +bool dnodeIsDnodeEpChanged(int32_t dnodeId, char *epstr) { bool changed = false; - pthread_mutex_lock(&eps->mutex); + pthread_mutex_lock(&tsDeps.mutex); - SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *ep = taosHashGet(tsDeps.dnodeHash, &dnodeId, sizeof(int32_t)); if (ep != NULL) { char epSaved[TSDB_EP_LEN + 1]; snprintf(epSaved, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); @@ -263,21 +265,20 @@ bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr) { tstrncpy(epstr, epSaved, TSDB_EP_LEN); } - pthread_mutex_unlock(&eps->mutex); + pthread_mutex_unlock(&tsDeps.mutex); return changed; } void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { - SDnEps *eps = dnodeInst()->eps; - pthread_mutex_lock(&eps->mutex); + pthread_mutex_lock(&tsDeps.mutex); - SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t)); + SDnodeEp *ep = taosHashGet(tsDeps.dnodeHash, &dnodeId, sizeof(int32_t)); if (ep != NULL) { if (port) *port = ep->dnodePort; if (fqdn) tstrncpy(fqdn, ep->dnodeFqdn, TSDB_FQDN_LEN); if (epstr) snprintf(epstr, TSDB_EP_LEN, "%s:%u", ep->dnodeFqdn, ep->dnodePort); } - pthread_mutex_unlock(&eps->mutex); + pthread_mutex_unlock(&tsDeps.mutex); } diff --git a/source/server/dnode/src/dnodeInt.c b/source/server/dnode/src/dnodeInt.c index abb552cb02..84a7e2565b 100644 --- a/source/server/dnode/src/dnodeInt.c +++ b/source/server/dnode/src/dnodeInt.c @@ -33,10 +33,7 @@ #include "mnode.h" #include "vnode.h" -SDnode *dnodeInst() { - static SDnode inst = {0}; - return &inst; -} +static struct SSteps *tsSteps; static int32_t dnodeInitVnodeModule(void **unused) { SVnodePara para; @@ -48,58 +45,50 @@ static int32_t dnodeInitVnodeModule(void **unused) { } static int32_t dnodeInitMnodeModule(void **unused) { - SDnode *dnode = dnodeInst(); - SMnodePara para; para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendRedirectMsg = dnodeSendRedirectMsg; - para.dnodeId = dnode->cfg->dnodeId; - strncpy(para.clusterId, dnode->cfg->clusterId, sizeof(para.clusterId)); + dnodeGetCfg(¶.dnodeId, para.clusterId); return mnodeInit(para); } int32_t dnodeInit() { - struct SSteps *steps = taosStepInit(24, dnodeReportStartup); - if (steps == NULL) return -1; + tsSteps = taosStepInit(24, dnodeReportStartup); + if (tsSteps == NULL) return -1; - SDnode *dnode = dnodeInst(); + taosStepAdd(tsSteps, "dnode-main", dnodeInitMain, dnodeCleanupMain); + taosStepAdd(tsSteps, "dnode-storage", dnodeInitStorage, dnodeCleanupStorage); + //taosStepAdd(tsSteps, "dnode-tfs", tfInit, tfCleanup); + taosStepAdd(tsSteps, "dnode-rpc", rpcInit, rpcCleanup); + taosStepAdd(tsSteps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck); + taosStepAdd(tsSteps, "dnode-cfg", dnodeInitCfg, dnodeCleanupCfg); + taosStepAdd(tsSteps, "dnode-deps", dnodeInitEps, dnodeCleanupEps); + taosStepAdd(tsSteps, "dnode-meps", dnodeInitMnodeEps, dnodeCleanupMnodeEps); + //taosStepAdd(tsSteps, "dnode-wal", walInit, walCleanUp); + //taosStepAdd(tsSteps, "dnode-sync", syncInit, syncCleanUp); + taosStepAdd(tsSteps, "dnode-vnode", dnodeInitVnodeModule, vnodeCleanup); + taosStepAdd(tsSteps, "dnode-mnode", dnodeInitMnodeModule, mnodeCleanup); + taosStepAdd(tsSteps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans); + taosStepAdd(tsSteps, "dnode-status", dnodeInitStatus, dnodeCleanupStatus); + taosStepAdd(tsSteps, "dnode-telem", dnodeInitTelem, dnodeCleanupTelem); + //taosStepAdd(tsSteps, "dnode-script",scriptEnvPoolInit, scriptEnvPoolCleanup); - taosStepAdd(steps, "dnode-main", (void **)&dnode->main, (InitFp)dnodeInitMain, (CleanupFp)dnodeCleanupMain); - taosStepAdd(steps, "dnode-storage", NULL, (InitFp)dnodeInitStorage, (CleanupFp)dnodeCleanupStorage); - //taosStepAdd(steps, "dnode-tfs", NULL, (InitFp)tfInit, (CleanupFp)tfCleanup); - taosStepAdd(steps, "dnode-rpc", NULL, (InitFp)rpcInit, (CleanupFp)rpcCleanup); - taosStepAdd(steps, "dnode-check", (void **)&dnode->check, (InitFp)dnodeInitCheck, (CleanupFp)dnodeCleanupCheck); - taosStepAdd(steps, "dnode-cfg", (void **)&dnode->cfg, (InitFp)dnodeInitCfg, (CleanupFp)dnodeCleanupCfg); - taosStepAdd(steps, "dnode-deps", (void **)&dnode->eps, (InitFp)dnodeInitEps, (CleanupFp)dnodeCleanupEps); - taosStepAdd(steps, "dnode-meps", (void **)&dnode->meps, (InitFp)dnodeInitMnodeEps, (CleanupFp)dnodeCleanupMnodeEps); - //taosStepAdd(steps, "dnode-wal", NULL, (InitFp)walInit, (CleanupFp)walCleanUp); - //taosStepAdd(steps, "dnode-sync", NULL, (InitFp)syncInit, (CleanupFp)syncCleanUp); - taosStepAdd(steps, "dnode-vnode", NULL, (InitFp)dnodeInitVnodeModule, (CleanupFp)vnodeCleanup); - taosStepAdd(steps, "dnode-mnode", NULL, (InitFp)dnodeInitMnodeModule, (CleanupFp)mnodeCleanup); - taosStepAdd(steps, "dnode-trans", (void **)&dnode->trans, (InitFp)dnodeInitTrans, (CleanupFp)dnodeCleanupTrans); - taosStepAdd(steps, "dnode-status", (void **)&dnode->status, (InitFp)dnodeInitStatus, (CleanupFp)dnodeCleanupStatus); - taosStepAdd(steps, "dnode-telem", (void **)&dnode->telem, (InitFp)dnodeInitTelem, (CleanupFp)dnodeCleanupTelem); - //taosStepAdd(steps, "dnode-script", NULL, (InitFp)scriptEnvPoolInit, (CleanupFp)scriptEnvPoolCleanup); + taosStepExec(tsSteps); - dnode->steps = steps; - taosStepExec(dnode->steps); - - if (dnode->main) { - dnode->main->runStatus = TD_RUN_STAT_RUNNING; - dnodeReportStartupFinished("TDengine", "initialized successfully"); - dInfo("TDengine is initialized successfully"); - } + dnodeSetRunStat(TD_RUN_STAT_RUNNING); + dnodeReportStartupFinished("TDengine", "initialized successfully"); + dInfo("TDengine is initialized successfully"); return 0; } void dnodeCleanup() { - SDnode *dnode = dnodeInst(); - if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) { - dnode->main->runStatus = TD_RUN_STAT_STOPPED; - taosStepCleanup(dnode->steps); + if (dnodeGetRunStat() != TD_RUN_STAT_STOPPED) { + dnodeSetRunStat(TD_RUN_STAT_STOPPED); + taosStepCleanup(tsSteps); + tsSteps = NULL; } } diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c index 410cb41eed..24de3b7924 100644 --- a/source/server/dnode/src/dnodeMain.c +++ b/source/server/dnode/src/dnodeMain.c @@ -17,15 +17,22 @@ #include "os.h" #include "tcache.h" #include "tconfig.h" +#include "tglobal.h" #if 0 #include "tfs.h" #endif +#include "tnote.h" +#include "tcompression.h" +#include "ttimer.h" #include "dnodeCfg.h" #include "dnodeMain.h" #include "mnode.h" -#include "tcompression.h" -#include "tnote.h" -#include "ttimer.h" + +static struct { + RunStat runStatus; + void * dnodeTimer; + SStartupStep startup; +} tsDmain; static void dnodeCheckDataDirOpenned(char *dir) { #if 0 @@ -47,27 +54,14 @@ static void dnodeCheckDataDirOpenned(char *dir) { #endif } -void dnodePrintDiskInfo() { - dInfo("=================================="); - dInfo(" os totalDisk: %f(GB)", tsTotalDataDirGB); - dInfo(" os usedDisk: %f(GB)", tsUsedDataDirGB); - dInfo(" os availDisk: %f(GB)", tsAvailDataDirGB); - dInfo("=================================="); -} - -int32_t dnodeInitMain(SDnMain **out) { - SDnMain* main = calloc(1, sizeof(SDnMain)); - if (main == NULL) return -1; - - main->runStatus = TD_RUN_STAT_STOPPED; - main->dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); - if (main->dnodeTimer == NULL) { +int32_t dnodeInitMain() { + tsDmain.runStatus = TD_RUN_STAT_STOPPED; + tsDmain.dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); + if (tsDmain.dnodeTimer == NULL) { dError("failed to init dnode timer"); return -1; } - *out = main; - tscEmbedded = 1; taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -76,7 +70,6 @@ int32_t dnodeInitMain(SDnMain **out) { taosReadGlobalLogCfg(); taosSetCoreDump(tsEnableCoreFile); - if (!taosMkDir(tsLogDir)) { printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); return -1; @@ -101,13 +94,10 @@ int32_t dnodeInitMain(SDnMain **out) { return taosCheckGlobalCfg(); } -void dnodeCleanupMain(SDnMain **out) { - SDnMain *main = *out; - *out = NULL; - - if (main->dnodeTimer != NULL) { - taosTmrCleanUp(main->dnodeTimer); - main->dnodeTimer = NULL; +void dnodeCleanupMain() { + if (tsDmain.dnodeTimer != NULL) { + taosTmrCleanUp(tsDmain.dnodeTimer); + tsDmain.dnodeTimer = NULL; } #if 0 @@ -115,8 +105,6 @@ void dnodeCleanupMain(SDnMain **out) { #endif taosCloseLog(); taosStopCacheRefreshWorker(); - - free(main); } int32_t dnodeInitStorage() { @@ -138,7 +126,7 @@ int32_t dnodeInitStorage() { } strncpy(tsDataDir, TFS_PRIMARY_PATH(), TSDB_FILENAME_LEN); -#endif +#endif sprintf(tsMnodeDir, "%s/mnode", tsDataDir); sprintf(tsVnodeDir, "%s/vnode", tsDataDir); sprintf(tsDnodeDir, "%s/dnode", tsDataDir); @@ -164,7 +152,6 @@ int32_t dnodeInitStorage() { return -1; } - TDIR *tdir = tfsOpendir("vnode_bak/.staging"); bool stagingNotEmpty = tfsReaddir(tdir) != NULL; tfsClosedir(tdir); @@ -190,7 +177,7 @@ int32_t dnodeInitStorage() { } void dnodeCleanupStorage() { -#if 0 +#if 0 // storage destroy tfsDestroy(); @@ -202,18 +189,14 @@ void dnodeCleanupStorage() { } void dnodeReportStartup(char *name, char *desc) { - SDnode *dnode = dnodeInst(); - if (dnode->main != NULL) { - SStartupStep *startup = &dnode->main->startup; - tstrncpy(startup->name, name, strlen(startup->name)); - tstrncpy(startup->desc, desc, strlen(startup->desc)); - startup->finished = 0; - } + SStartupStep *startup = &tsDmain.startup; + tstrncpy(startup->name, name, strlen(startup->name)); + tstrncpy(startup->desc, desc, strlen(startup->desc)); + startup->finished = 0; } void dnodeReportStartupFinished(char *name, char *desc) { - SDnode *dnode = dnodeInst(); - SStartupStep *startup = &dnode->main->startup; + SStartupStep *startup = &tsDmain.startup; tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->desc, desc, strlen(startup->desc)); startup->finished = 1; @@ -222,9 +205,8 @@ void dnodeReportStartupFinished(char *name, char *desc) { void dnodeProcessStartupReq(SRpcMsg *pMsg) { dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); - SDnode *dnode = dnodeInst(); SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); - memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep)); + memcpy(pStep, &tsDmain.startup, sizeof(SStartupStep)); dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished); @@ -234,12 +216,11 @@ void dnodeProcessStartupReq(SRpcMsg *pMsg) { } static int32_t dnodeStartMnode(SRpcMsg *pMsg) { - SDnode *dnode = dnodeInst(); SCreateMnodeMsg *pCfg = pMsg->pCont; pCfg->dnodeId = htonl(pCfg->dnodeId); - if (pCfg->dnodeId != dnode->cfg->dnodeId) { + if (pCfg->dnodeId != dnodeGetDnodeId()) { dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, - dnodeGetDnodeId(dnode->cfg)); + dnodeGetDnodeId()); return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; } @@ -277,4 +258,10 @@ void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) { rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); -} \ No newline at end of file +} + +RunStat dnodeGetRunStat() { return tsDmain.runStatus; } + +void dnodeSetRunStat(RunStat stat) { tsDmain.runStatus = stat; } + +void* dnodeGetTimer() { return tsDmain.dnodeTimer; } \ No newline at end of file diff --git a/source/server/dnode/src/dnodeMnodeEps.c b/source/server/dnode/src/dnodeMnodeEps.c index ab5c76b580..5646fd5363 100644 --- a/source/server/dnode/src/dnodeMnodeEps.c +++ b/source/server/dnode/src/dnodeMnodeEps.c @@ -22,43 +22,51 @@ #include "dnodeMnodeEps.h" #include "mnode.h" -static void dnodePrintMnodeEps(SDnMnEps *meps) { - SRpcEpSet *epset = &meps->mnodeEpSet; +static struct { + SRpcEpSet mnodeEpSet; + SMInfos mnodeInfos; + char file[PATH_MAX + 20]; + pthread_mutex_t mutex; +} tsDmeps; + + +static void dnodePrintMnodeEps() { + SRpcEpSet *epset = &tsDmeps.mnodeEpSet; dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse); for (int32_t i = 0; i < epset->numOfEps; i++) { dInfo("ep index:%d, %s:%u", i, epset->fqdn[i], epset->port[i]); } } -static void dnodeResetMnodeEps(SDnMnEps *meps, SMInfos *mInfos) { +static void dnodeResetMnodeEps(SMInfos *mInfos) { if (mInfos == NULL || mInfos->mnodeNum == 0) { - meps->mnodeEpSet.numOfEps = 1; - taosGetFqdnPortFromEp(tsFirst, meps->mnodeEpSet.fqdn[0], &meps->mnodeEpSet.port[0]); + tsDmeps.mnodeEpSet.numOfEps = 1; + taosGetFqdnPortFromEp(tsFirst, tsDmeps.mnodeEpSet.fqdn[0], &tsDmeps.mnodeEpSet.port[0]); if (strcmp(tsSecond, tsFirst) != 0) { - meps->mnodeEpSet.numOfEps = 2; - taosGetFqdnPortFromEp(tsSecond, meps->mnodeEpSet.fqdn[1], &meps->mnodeEpSet.port[1]); + tsDmeps.mnodeEpSet.numOfEps = 2; + taosGetFqdnPortFromEp(tsSecond, tsDmeps.mnodeEpSet.fqdn[1], &tsDmeps.mnodeEpSet.port[1]); } - dnodePrintMnodeEps(meps); + dnodePrintMnodeEps(); return; } - int32_t size = sizeof(SMInfos); - memcpy(&meps->mnodeInfos, mInfos, size); + int32_t size = sizeof(SMInfos); + memcpy(&tsDmeps.mnodeInfos, mInfos, size); - meps->mnodeEpSet.inUse = meps->mnodeInfos.inUse; - meps->mnodeEpSet.numOfEps = meps->mnodeInfos.mnodeNum; - for (int32_t i = 0; i < meps->mnodeInfos.mnodeNum; i++) { - taosGetFqdnPortFromEp(meps->mnodeInfos.mnodeInfos[i].mnodeEp, meps->mnodeEpSet.fqdn[i], &meps->mnodeEpSet.port[i]); + tsDmeps.mnodeEpSet.inUse = tsDmeps.mnodeInfos.inUse; + tsDmeps.mnodeEpSet.numOfEps = tsDmeps.mnodeInfos.mnodeNum; + for (int32_t i = 0; i < tsDmeps.mnodeInfos.mnodeNum; i++) { + taosGetFqdnPortFromEp(tsDmeps.mnodeInfos.mnodeInfos[i].mnodeEp, tsDmeps.mnodeEpSet.fqdn[i], &tsDmeps.mnodeEpSet.port[i]); } - dnodePrintMnodeEps(meps); + dnodePrintMnodeEps(); } -static int32_t dnodeWriteMnodeEps(SDnMnEps *meps) { - FILE *fp = fopen(meps->file, "w"); +static int32_t dnodeWriteMnodeEps() { + FILE *fp = fopen(tsDmeps.file, "w"); if (!fp) { - dError("failed to write %s since %s", meps->file, strerror(errno)); + dError("failed to write %s since %s", tsDmeps.file, strerror(errno)); return -1; } @@ -67,13 +75,13 @@ static int32_t dnodeWriteMnodeEps(SDnMnEps *meps) { char * content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", meps->mnodeInfos.inUse); - len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", meps->mnodeInfos.mnodeNum); + len += snprintf(content + len, maxLen - len, " \"inUse\": %d,\n", tsDmeps.mnodeInfos.inUse); + len += snprintf(content + len, maxLen - len, " \"nodeNum\": %d,\n", tsDmeps.mnodeInfos.mnodeNum); len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); - for (int32_t i = 0; i < meps->mnodeInfos.mnodeNum; i++) { - len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", meps->mnodeInfos.mnodeInfos[i].mnodeId); - len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", meps->mnodeInfos.mnodeInfos[i].mnodeEp); - if (i < meps->mnodeInfos.mnodeNum - 1) { + for (int32_t i = 0; i < tsDmeps.mnodeInfos.mnodeNum; i++) { + len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", tsDmeps.mnodeInfos.mnodeInfos[i].mnodeId); + len += snprintf(content + len, maxLen - len, " \"nodeEp\": \"%s\"\n", tsDmeps.mnodeInfos.mnodeInfos[i].mnodeEp); + if (i < tsDmeps.mnodeInfos.mnodeNum - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -87,11 +95,11 @@ static int32_t dnodeWriteMnodeEps(SDnMnEps *meps) { free(content); terrno = 0; - dInfo("successed to write %s", meps->file); + dInfo("successed to write %s", tsDmeps.file); return 0; } -static int32_t dnodeReadMnodeEps(SDnMnEps *meps, SDnEps *deps) { +static int32_t dnodeReadMnodeEps() { int32_t len = 0; int32_t maxLen = 2000; char * content = calloc(1, maxLen + 1); @@ -100,22 +108,22 @@ static int32_t dnodeReadMnodeEps(SDnMnEps *meps, SDnEps *deps) { SMInfos mInfos = {0}; bool nodeChanged = false; - fp = fopen(meps->file, "r"); + fp = fopen(tsDmeps.file, "r"); if (!fp) { - dDebug("file %s not exist", meps->file); + dDebug("file %s not exist", tsDmeps.file); goto PARSE_MINFOS_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", meps->file); + dError("failed to read %s since content is null", tsDmeps.file); goto PARSE_MINFOS_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", meps->file); + dError("failed to read %s since invalid json format", tsDmeps.file); goto PARSE_MINFOS_OVER; } @@ -124,7 +132,7 @@ static int32_t dnodeReadMnodeEps(SDnMnEps *meps, SDnEps *deps) { dError("failed to read mnodeEpSet.json since inUse not found"); goto PARSE_MINFOS_OVER; } - meps->mnodeInfos.inUse = (int8_t)inUse->valueint; + tsDmeps.mnodeInfos.inUse = (int8_t)inUse->valueint; cJSON *nodeNum = cJSON_GetObjectItem(root, "nodeNum"); if (!nodeNum || nodeNum->type != cJSON_Number) { @@ -165,11 +173,11 @@ static int32_t dnodeReadMnodeEps(SDnMnEps *meps, SDnEps *deps) { mInfo->mnodeId = (int32_t)nodeId->valueint; tstrncpy(mInfo->mnodeEp, nodeEp->valuestring, TSDB_EP_LEN); - bool changed = dnodeIsDnodeEpChanged(deps, mInfo->mnodeId, mInfo->mnodeEp); + bool changed = dnodeIsDnodeEpChanged(mInfo->mnodeId, mInfo->mnodeEp); if (changed) nodeChanged = changed; } - dInfo("successed to read file %s", meps->file); + dInfo("successed to read file %s", tsDmeps.file); PARSE_MINFOS_OVER: if (content != NULL) free(content); @@ -182,25 +190,24 @@ PARSE_MINFOS_OVER: dnodeGetDnodeEp(mInfo->mnodeId, mInfo->mnodeEp, NULL, NULL); } - dnodeResetMnodeEps(meps, &mInfos); + dnodeResetMnodeEps(&mInfos); if (nodeChanged) { - dnodeWriteMnodeEps(meps); + dnodeWriteMnodeEps(); } return 0; } void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { - SDnMnEps *meps = dnodeInst()->meps; SRpcConnInfo connInfo = {0}; rpcGetConnInfo(rpcMsg->handle, &connInfo); SRpcEpSet epSet = {0}; if (forShell) { - dnodeGetEpSetForShell(meps, &epSet); + dnodeGetEpSetForShell(&epSet); } else { - dnodeGetEpSetForPeer(meps, &epSet); + dnodeGetEpSetForPeer(&epSet); } dDebug("msg:%s will be redirected, dnodeIp:%s user:%s, numOfEps:%d inUse:%d", taosMsg[rpcMsg->msgType], @@ -222,16 +229,12 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { rpcSendRedirectRsp(rpcMsg->handle, &epSet); } -int32_t dnodeInitMnodeEps(SDnMnEps **out) { - SDnMnEps *meps = calloc(1, sizeof(SDnMnEps)); - if (meps == NULL) return -1; +int32_t dnodeInitMnodeEps() { + snprintf(tsDmeps.file, sizeof(tsDmeps.file), "%s/mnodeEpSet.json", tsDnodeDir); + pthread_mutex_init(&tsDmeps.mutex, NULL); - snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir); - pthread_mutex_init(&meps->mutex, NULL); - *out = meps; - - dnodeResetMnodeEps(meps, NULL); - int32_t ret = dnodeReadMnodeEps(meps, dnodeInst()->eps); + dnodeResetMnodeEps(NULL); + int32_t ret = dnodeReadMnodeEps(); if (ret == 0) { dInfo("dnode mInfos is initialized"); } @@ -239,17 +242,11 @@ int32_t dnodeInitMnodeEps(SDnMnEps **out) { return ret; } -void dnodeCleanupMnodeEps(SDnMnEps **out) { - SDnMnEps *meps = *out; - *out = NULL; - - if (meps != NULL) { - pthread_mutex_destroy(&meps->mutex); - free(meps); - } +void dnodeCleanupMnodeEps() { + pthread_mutex_destroy(&tsDmeps.mutex); } -void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *mInfos) { +void dnodeUpdateMnodeFromStatus(SMInfos *mInfos) { if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) { dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum); return; @@ -264,53 +261,51 @@ void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *mInfos) { } } - pthread_mutex_lock(&meps->mutex); - if (mInfos->mnodeNum != meps->mnodeInfos.mnodeNum) { - dnodeResetMnodeEps(meps, mInfos); - dnodeWriteMnodeEps(meps); + pthread_mutex_lock(&tsDmeps.mutex); + if (mInfos->mnodeNum != tsDmeps.mnodeInfos.mnodeNum) { + dnodeResetMnodeEps(mInfos); + dnodeWriteMnodeEps(); } else { int32_t size = sizeof(SMInfos); - if (memcmp(mInfos, &meps->mnodeInfos, size) != 0) { - dnodeResetMnodeEps(meps, mInfos); - dnodeWriteMnodeEps(meps); + if (memcmp(mInfos, &tsDmeps.mnodeInfos, size) != 0) { + dnodeResetMnodeEps(mInfos); + dnodeWriteMnodeEps(); } } - pthread_mutex_unlock(&meps->mutex); + pthread_mutex_unlock(&tsDmeps.mutex); } -void dnodeUpdateMnodeFromPeer(SDnMnEps *meps, SRpcEpSet *ep) { +void dnodeUpdateMnodeFromPeer(SRpcEpSet *ep) { if (ep->numOfEps <= 0) { dError("mInfos is changed, but content is invalid, discard it"); return; } - pthread_mutex_lock(&meps->mutex); + pthread_mutex_lock(&tsDmeps.mutex); dInfo("mInfos is changed, numOfEps:%d inUse:%d", ep->numOfEps, ep->inUse); for (int32_t i = 0; i < ep->numOfEps; ++i) { ep->port[i] -= TSDB_PORT_DNODEDNODE; dInfo("minfo:%d %s:%u", i, ep->fqdn[i], ep->port[i]); } - meps->mnodeEpSet = *ep; + tsDmeps.mnodeEpSet = *ep; - pthread_mutex_unlock(&meps->mutex); + pthread_mutex_unlock(&tsDmeps.mutex); } -void dnodeGetEpSetForPeer(SDnMnEps *meps, SRpcEpSet *epSet) { - pthread_mutex_lock(&meps->mutex); +void dnodeGetEpSetForPeer(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsDmeps.mutex); - *epSet = meps->mnodeEpSet; + *epSet = tsDmeps.mnodeEpSet; for (int32_t i = 0; i < epSet->numOfEps; ++i) { epSet->port[i] += TSDB_PORT_DNODEDNODE; } - pthread_mutex_unlock(&meps->mutex); + pthread_mutex_unlock(&tsDmeps.mutex); } -void dnodeGetEpSetForShell(SDnMnEps *meps, SRpcEpSet *epSet) { - pthread_mutex_lock(&meps->mutex); - - *epSet = meps->mnodeEpSet; - - pthread_mutex_unlock(&meps->mutex); +void dnodeGetEpSetForShell(SRpcEpSet *epSet) { + pthread_mutex_lock(&tsDmeps.mutex); + *epSet = tsDmeps.mnodeEpSet; + pthread_mutex_unlock(&tsDmeps.mutex); } diff --git a/source/server/dnode/src/dnodeStatus.c b/source/server/dnode/src/dnodeStatus.c index 6abc886147..38b685c1fb 100644 --- a/source/server/dnode/src/dnodeStatus.c +++ b/source/server/dnode/src/dnodeStatus.c @@ -25,15 +25,15 @@ #include "dnodeMain.h" #include "vnode.h" -static void dnodeSendStatusMsg(void *handle, void *tmrId) { - SDnStatus *status = handle; - if (status->dnodeTimer == NULL) { - dError("dnode timer is already released"); - return; - } +static struct { + void * dnodeTimer; + void * statusTimer; + uint32_t rebootTime; +} tsStatus; - if (status->statusTimer == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); +static void dnodeSendStatusMsg(void *handle, void *tmrId) { + if (tsStatus.statusTimer == NULL) { + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); dError("failed to start status timer"); return; } @@ -41,16 +41,15 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SStatusMsg *pStatus = rpcMallocCont(contLen); if (pStatus == NULL) { - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); dError("failed to malloc status message"); return; } - SDnode *dnode = dnodeInst(); - dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId); - pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg)); + dnodeGetCfg(&pStatus->dnodeId, pStatus->clusterId); + pStatus->dnodeId = htonl(dnodeGetDnodeId()); pStatus->version = htonl(tsVersion); - pStatus->lastReboot = htonl(status->rebootTime); + pStatus->lastReboot = htonl(tsStatus.rebootTime); pStatus->numOfCores = htons((uint16_t)tsNumOfCores); pStatus->diskAvailable = tsAvailDataDirGB; pStatus->alternativeRole = tsAlternativeRole; @@ -80,69 +79,58 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) { contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad); pStatus->openVnodes = htons(pStatus->openVnodes); - SRpcMsg rpcMsg = {.ahandle = status, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; + SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS}; dnodeSendMsgToMnode(&rpcMsg); } void dnodeProcessStatusRsp(SRpcMsg *pMsg) { - SDnode *dnode = dnodeInst(); - SDnStatus *status = pMsg->ahandle; - if (pMsg->code != TSDB_CODE_SUCCESS) { dError("status rsp is received, error:%s", tstrerror(pMsg->code)); if (pMsg->code == TSDB_CODE_MND_DNODE_NOT_EXIST) { char clusterId[TSDB_CLUSTER_ID_LEN]; - dnodeGetClusterId(dnode->cfg, clusterId); + dnodeGetClusterId(clusterId); if (clusterId[0] != '\0') { - dnodeSetDropped(dnode->cfg); + dnodeSetDropped(); dError("exit zombie dropped dnode"); exit(EXIT_FAILURE); } } - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); return; } SStatusRsp *pStatusRsp = pMsg->pCont; SMInfos * minfos = &pStatusRsp->mnodes; - dnodeUpdateMnodeFromStatus(dnode->meps, minfos); + dnodeUpdateMnodeFromStatus(minfos); SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg; pCfg->numOfVnodes = htonl(pCfg->numOfVnodes); pCfg->moduleStatus = htonl(pCfg->moduleStatus); pCfg->dnodeId = htonl(pCfg->dnodeId); - dnodeUpdateCfg(dnode->cfg, pCfg); + dnodeUpdateCfg(pCfg); vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes); SDnodeEps *pEps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess)); - dnodeUpdateEps(dnode->eps, pEps); + dnodeUpdateEps(pEps); - taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); + taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); } -int32_t dnodeInitStatus(SDnStatus **out) { - SDnStatus *status = calloc(1, sizeof(SDnStatus)); - if (status == NULL) return -1; - status->statusTimer = NULL; - status->dnodeTimer = dnodeInst()->main->dnodeTimer; - status->rebootTime = taosGetTimestampSec(); - taosTmrReset(dnodeSendStatusMsg, 500, status, status->dnodeTimer, &status->statusTimer); - *out = status; +int32_t dnodeInitStatus() { + tsStatus.statusTimer = NULL; + tsStatus.dnodeTimer = dnodeGetTimer(); + tsStatus.rebootTime = taosGetTimestampSec(); + taosTmrReset(dnodeSendStatusMsg, 500, NULL, tsStatus.dnodeTimer, &tsStatus.statusTimer); dInfo("dnode status timer is initialized"); return TSDB_CODE_SUCCESS; } -void dnodeCleanupStatus(SDnStatus **out) { - SDnStatus *status = *out; - *out = NULL; - - if (status->statusTimer != NULL) { - taosTmrStopA(&status->statusTimer); - status->statusTimer = NULL; +void dnodeCleanupStatus() { + if (tsStatus.statusTimer != NULL) { + taosTmrStopA(&tsStatus.statusTimer); + tsStatus.statusTimer = NULL; } - - free(status); } diff --git a/source/server/dnode/src/dnodeTelem.c b/source/server/dnode/src/dnodeTelem.c index b221746c83..7c87ea5f50 100644 --- a/source/server/dnode/src/dnodeTelem.c +++ b/source/server/dnode/src/dnodeTelem.c @@ -25,6 +25,19 @@ #define TELEMETRY_PORT 80 #define REPORT_INTERVAL 86400 +/* + * sem_timedwait is NOT implemented on MacOSX + * thus we use pthread_mutex_t/pthread_cond_t to simulate + */ +static struct { + bool enable; + pthread_mutex_t lock; + pthread_cond_t cond; + volatile int32_t exit; + pthread_t thread; + char email[TSDB_FQDN_LEN]; +} tsTelem; + static void dnodeBeginObject(SBufferWriter* bw) { tbufWriteChar(bw, '{'); } static void dnodeCloseObject(SBufferWriter* bw) { @@ -154,14 +167,14 @@ static void dnodeAddMemoryInfo(SBufferWriter* bw) { fclose(fp); } -static void dnodeAddVersionInfo(SDnTelem* telem, SBufferWriter* bw) { +static void dnodeAddVersionInfo(SBufferWriter* bw) { dnodeAddStringField(bw, "version", version); dnodeAddStringField(bw, "buildInfo", buildinfo); dnodeAddStringField(bw, "gitInfo", gitinfo); - dnodeAddStringField(bw, "email", telem->email); + dnodeAddStringField(bw, "email", tsTelem.email); } -static void dnodeAddRuntimeInfo(SDnTelem* telem, SBufferWriter* bw) { +static void dnodeAddRuntimeInfo(SBufferWriter* bw) { SMnodeStat stat = {0}; if (mnodeGetStatistics(&stat) != 0) { return; @@ -179,7 +192,7 @@ static void dnodeAddRuntimeInfo(SDnTelem* telem, SBufferWriter* bw) { dnodeAddIntField(bw, "compStorage", stat.compStorage); } -static void dnodeSendTelemetryReport(SDnTelem* telem) { +static void dnodeSendTelemetryReport() { char buf[128] = {0}; uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); if (ip == 0xffffffff) { @@ -192,16 +205,18 @@ static void dnodeSendTelemetryReport(SDnTelem* telem) { return; } - SDnode *dnode = dnodeInst(); + char clusterId[TSDB_CLUSTER_ID_LEN] = {0}; + dnodeGetClusterId(clusterId); + SBufferWriter bw = tbufInitWriter(NULL, false); dnodeBeginObject(&bw); - dnodeAddStringField(&bw, "instanceId", dnode->cfg->clusterId); + dnodeAddStringField(&bw, "instanceId", clusterId); dnodeAddIntField(&bw, "reportVersion", 1); dnodeAddOsInfo(&bw); dnodeAddCpuInfo(&bw); dnodeAddMemoryInfo(&bw); - dnodeAddVersionInfo(telem, &bw); - dnodeAddRuntimeInfo(telem, &bw); + dnodeAddVersionInfo(&bw); + dnodeAddRuntimeInfo(&bw); dnodeCloseObject(&bw); const char* header = @@ -227,25 +242,23 @@ static void dnodeSendTelemetryReport(SDnTelem* telem) { } static void* dnodeTelemThreadFp(void* param) { - SDnTelem* telem = param; - struct timespec end = {0}; clock_gettime(CLOCK_REALTIME, &end); end.tv_sec += 300; // wait 5 minutes before send first report setThreadName("dnode-telem"); - while (!telem->exit) { + while (!tsTelem.exit) { int32_t r = 0; struct timespec ts = end; - pthread_mutex_lock(&telem->lock); - r = pthread_cond_timedwait(&telem->cond, &telem->lock, &ts); - pthread_mutex_unlock(&telem->lock); + pthread_mutex_lock(&tsTelem.lock); + r = pthread_cond_timedwait(&tsTelem.cond, &tsTelem.lock, &ts); + pthread_mutex_unlock(&tsTelem.lock); if (r == 0) break; if (r != ETIMEDOUT) continue; if (mnodeIsServing()) { - dnodeSendTelemetryReport(telem); + dnodeSendTelemetryReport(); } end.tv_sec += REPORT_INTERVAL; } @@ -253,40 +266,35 @@ static void* dnodeTelemThreadFp(void* param) { return NULL; } -static void dnodeGetEmail(SDnTelem* telem, char* filepath) { +static void dnodeGetEmail(char* filepath) { int32_t fd = taosOpenFileRead(filepath); if (fd < 0) { return; } - if (taosReadFile(fd, (void*)telem->email, TSDB_FQDN_LEN) < 0) { + if (taosReadFile(fd, (void*)tsTelem.email, TSDB_FQDN_LEN) < 0) { dError("failed to read %d bytes from file %s since %s", TSDB_FQDN_LEN, filepath, strerror(errno)); } taosCloseFile(fd); } -int32_t dnodeInitTelem(SDnTelem** out) { - SDnTelem* telem = calloc(1, sizeof(SDnTelem)); - if (telem == NULL) return -1; +int32_t dnodeInitTelem() { + tsTelem.enable = tsEnableTelemetryReporting; + if (!tsTelem.enable) return 0; - telem->enable = tsEnableTelemetryReporting; - *out = telem; + tsTelem.exit = 0; + pthread_mutex_init(&tsTelem.lock, NULL); + pthread_cond_init(&tsTelem.cond, NULL); + tsTelem.email[0] = 0; - if (!telem->enable) return 0; - - telem->exit = 0; - pthread_mutex_init(&telem->lock, NULL); - pthread_cond_init(&telem->cond, NULL); - telem->email[0] = 0; - - dnodeGetEmail(telem, "/usr/local/taos/email"); + dnodeGetEmail("/usr/local/taos/email"); pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - int32_t code = pthread_create(&telem->thread, &attr, dnodeTelemThreadFp, telem); + int32_t code = pthread_create(&tsTelem.thread, &attr, dnodeTelemThreadFp, NULL); pthread_attr_destroy(&attr); if (code != 0) { dTrace("failed to create telemetry thread since :%s", strerror(code)); @@ -296,26 +304,18 @@ int32_t dnodeInitTelem(SDnTelem** out) { return 0; } -void dnodeCleanupTelem(SDnTelem** out) { - SDnTelem* telem = *out; - *out = NULL; +void dnodeCleanupTelem() { + if (!tsTelem.enable) return; - if (!telem->enable) { - free(telem); - return; + if (taosCheckPthreadValid(tsTelem.thread)) { + pthread_mutex_lock(&tsTelem.lock); + tsTelem.exit = 1; + pthread_cond_signal(&tsTelem.cond); + pthread_mutex_unlock(&tsTelem.lock); + + pthread_join(tsTelem.thread, NULL); } - if (taosCheckPthreadValid(telem->thread)) { - pthread_mutex_lock(&telem->lock); - telem->exit = 1; - pthread_cond_signal(&telem->cond); - pthread_mutex_unlock(&telem->lock); - - pthread_join(telem->thread, NULL); - } - - pthread_mutex_destroy(&telem->lock); - pthread_cond_destroy(&telem->cond); - - free(telem); + pthread_mutex_destroy(&tsTelem.lock); + pthread_cond_destroy(&tsTelem.cond); } diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 7a870f22c8..a4409674f1 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -14,7 +14,7 @@ */ /* this file is mainly responsible for the communication between DNODEs. Each - * dnode works as both server and client. SDnode may send status, grant, config + * dnode works as both server and client. Dnode may send status, grant, config * messages to mnode, mnode may send create/alter/drop table/vnode messages * to dnode. All theses messages are handled from here */ @@ -29,8 +29,19 @@ #include "vnode.h" #include "mnode.h" +typedef void (*RpcMsgFp)( SRpcMsg *pMsg); + +static struct { + void * serverRpc; + void * clientRpc; + void * shellRpc; + int32_t queryReqNum; + int32_t submitReqNum; + RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; + RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX]; +} tsTrans; + static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SDnode * dnode = dnodeInst(); SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; @@ -39,7 +50,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { return; } - if (dnode->main->runStatus != TD_RUN_STAT_RUNNING) { + if (dnodeGetRunStat() != TD_RUN_STAT_RUNNING) { rspMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rspMsg); rpcFreeCont(pMsg->pCont); @@ -53,7 +64,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { return; } - RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType]; + RpcMsgFp fp = tsTrans.peerMsgFp[pMsg->msgType]; if (fp != NULL) { (*fp)(pMsg); } else { @@ -64,27 +75,27 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } } -int32_t dnodeInitServer(SDnTrans *trans) { - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; +int32_t dnodeInitServer() { + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -96,8 +107,8 @@ int32_t dnodeInitServer(SDnTrans *trans) { rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; - trans->serverRpc = rpcOpen(&rpcInit); - if (trans->serverRpc == NULL) { + tsTrans.serverRpc = rpcOpen(&rpcInit); + if (tsTrans.serverRpc == NULL) { dError("failed to init peer rpc server"); return -1; } @@ -106,17 +117,16 @@ int32_t dnodeInitServer(SDnTrans *trans) { return 0; } -void dnodeCleanupServer(SDnTrans *trans) { - if (trans->serverRpc) { - rpcClose(trans->serverRpc); - trans->serverRpc = NULL; +void dnodeCleanupServer() { + if (tsTrans.serverRpc) { + rpcClose(tsTrans.serverRpc); + tsTrans.serverRpc = NULL; dInfo("dnode peer server is closed"); } } static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SDnode *dnode = dnodeInst(); - if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) { + if (dnodeGetRunStat() == TD_RUN_STAT_STOPPED) { if (pMsg == NULL || pMsg->pCont == NULL) return; dTrace("msg:%p is ignored since dnode is stopping", pMsg); rpcFreeCont(pMsg->pCont); @@ -124,10 +134,10 @@ static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeFromPeer(dnode->meps, pEpSet); + dnodeUpdateMnodeFromPeer(pEpSet); } - RpcMsgFp fp = dnode->trans->peerMsgFp[pMsg->msgType]; + RpcMsgFp fp = tsTrans.peerMsgFp[pMsg->msgType]; if (fp != NULL) { (*fp)(pMsg); } else { @@ -141,27 +151,27 @@ static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { rpcFreeCont(pMsg->pCont); } -int32_t dnodeInitClient(SDnTrans *trans) { - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; +int32_t dnodeInitClient() { + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg; - trans->peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; char secret[TSDB_KEY_LEN] = "secret"; SRpcInit rpcInit; @@ -176,8 +186,8 @@ int32_t dnodeInitClient(SDnTrans *trans) { rpcInit.ckey = "key"; rpcInit.secret = secret; - trans->clientRpc = rpcOpen(&rpcInit); - if (trans->clientRpc == NULL) { + tsTrans.clientRpc = rpcOpen(&rpcInit); + if (tsTrans.clientRpc == NULL) { dError("failed to init peer rpc client"); return -1; } @@ -186,26 +196,25 @@ int32_t dnodeInitClient(SDnTrans *trans) { return 0; } -void dnodeCleanupClient(SDnTrans *trans) { - if (trans->clientRpc) { - rpcClose(trans->clientRpc); - trans->clientRpc = NULL; +void dnodeCleanupClient() { + if (tsTrans.clientRpc) { + rpcClose(tsTrans.clientRpc); + tsTrans.clientRpc = NULL; dInfo("dnode peer rpc client is closed"); } } static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SDnode * dnode = dnodeInst(); SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; if (pMsg->pCont == NULL) return; - if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) { + if (dnodeGetRunStat() == TD_RUN_STAT_STOPPED) { dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_DND_EXITING; rpcSendResponse(&rpcMsg); rpcFreeCont(pMsg->pCont); return; - } else if (dnode->main->runStatus != TD_RUN_STAT_RUNNING) { + } else if (dnodeGetRunStat() != TD_RUN_STAT_RUNNING) { dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); rpcMsg.code = TSDB_CODE_APP_NOT_READY; rpcSendResponse(&rpcMsg); @@ -213,14 +222,13 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { return; } - SDnTrans *trans = dnode->trans; if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { - atomic_fetch_add_32(&trans->queryReqNum, 1); + atomic_fetch_add_32(&tsTrans.queryReqNum, 1); } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { - atomic_fetch_add_32(&trans->submitReqNum, 1); + atomic_fetch_add_32(&tsTrans.submitReqNum, 1); } else {} - RpcMsgFp fp = trans->shellMsgFp[pMsg->msgType]; + RpcMsgFp fp = tsTrans.shellMsgFp[pMsg->msgType]; if (fp != NULL) { (*fp)(pMsg); } else { @@ -247,27 +255,23 @@ static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secr } void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { - SDnode *dnode = dnodeInst(); - rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL); + rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { - SDnode * dnode = dnodeInst(); SRpcEpSet epSet = {0}; - dnodeGetEpSetForPeer(dnode->meps, &epSet); + dnodeGetEpSetForPeer(&epSet); dnodeSendMsgToDnode(&epSet, rpcMsg); } void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { - SDnode * dnode = dnodeInst(); SRpcEpSet epSet = {0}; - dnodeGetEpSetForPeer(dnode->meps, &epSet); - rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp); + dnodeGetEpSetForPeer(&epSet); + rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); } void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { - SDnode *dnode = dnodeInst(); - rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp); + rpcSendRecv(tsTrans.clientRpc, epSet, rpcMsg, rpcRsp); } static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { @@ -303,52 +307,52 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c return rpcRsp.code; } -int32_t dnodeInitShell(SDnTrans *trans) { - trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; +int32_t dnodeInitShell() { + tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; // the following message shall be treated as mnode write - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; // the following message shall be treated as mnode query - trans->shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg; - trans->shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { @@ -366,8 +370,8 @@ int32_t dnodeInitShell(SDnTrans *trans) { rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.afp = dnodeRetrieveUserAuthInfo; - trans->shellRpc = rpcOpen(&rpcInit); - if (trans->shellRpc == NULL) { + tsTrans.shellRpc = rpcOpen(&rpcInit); + if (tsTrans.shellRpc == NULL) { dError("failed to init shell rpc server"); return -1; } @@ -376,41 +380,31 @@ int32_t dnodeInitShell(SDnTrans *trans) { return 0; } -void dnodeCleanupShell(SDnTrans *trans) { - if (trans->shellRpc) { - rpcClose(trans->shellRpc); - trans->shellRpc = NULL; +void dnodeCleanupShell() { + if (tsTrans.shellRpc) { + rpcClose(tsTrans.shellRpc); + tsTrans.shellRpc = NULL; } } -int32_t dnodeInitTrans(SDnTrans **out) { - SDnTrans *trans = calloc(1, sizeof(SDnTrans)); - if (trans == NULL) return -1; - - *out = trans; - - if (dnodeInitClient(trans) != 0) { +int32_t dnodeInitTrans() { + if (dnodeInitClient() != 0) { return -1; } - if (dnodeInitServer(trans) != 0) { + if (dnodeInitServer() != 0) { return -1; } - if (dnodeInitShell(trans) != 0) { + if (dnodeInitShell() != 0) { return -1; } return 0; } -void dnodeCleanupTrans(SDnTrans **out) { - SDnTrans* trans = *out; - *out = NULL; - - dnodeCleanupShell(trans); - dnodeCleanupServer(trans); - dnodeCleanupClient(trans); - - free(trans); +void dnodeCleanupTrans() { + dnodeCleanupShell(); + dnodeCleanupServer(); + dnodeCleanupClient(); } diff --git a/source/util/src/tstep.c b/source/util/src/tstep.c index b04135194a..656ac658a0 100644 --- a/source/util/src/tstep.c +++ b/source/util/src/tstep.c @@ -20,15 +20,14 @@ typedef struct SStepObj { char * name; - void ** self; InitFp initFp; CleanupFp cleanupFp; } SStep; typedef struct SSteps { - int32_t cursize; - int32_t maxsize; - SStep * steps; + int32_t cursize; + int32_t maxsize; + SStep * steps; ReportFp reportFp; } SSteps; @@ -44,14 +43,14 @@ SSteps *taosStepInit(int32_t maxsize, ReportFp fp) { return steps; } -int32_t taosStepAdd(struct SSteps *steps, char *name, void **obj, InitFp initFp, CleanupFp cleanupFp) { +int32_t taosStepAdd(struct SSteps *steps, char *name, InitFp initFp, CleanupFp cleanupFp) { if (steps == NULL) return -1; if (steps->cursize >= steps->maxsize) { uError("failed to add step since up to the maxsize"); return -1; } - SStep step = {.name = name, .self = obj, .initFp = initFp, .cleanupFp = cleanupFp}; + SStep step = {.name = name, .initFp = initFp, .cleanupFp = cleanupFp}; steps->steps[steps->cursize++] = step; return 0; } @@ -61,7 +60,7 @@ static void taosStepCleanupImp(SSteps *steps, int32_t pos) { SStep *step = steps->steps + s; uDebug("step:%s will cleanup", step->name); if (step->cleanupFp != NULL) { - (*step->cleanupFp)(step->self); + (*step->cleanupFp)(); } } } @@ -77,7 +76,7 @@ int32_t taosStepExec(SSteps *steps) { (*steps->reportFp)(step->name, "start initialize"); } - int32_t code = (*step->initFp)(step->self); + int32_t code = (*step->initFp)(); if (code != 0) { uDebug("step:%s will cleanup", step->name); taosStepCleanupImp(steps, s); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 5370e78c09..9e21583895 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -19,37 +19,16 @@ #include "tqueue.h" #include "tworker.h" -static void *taosWorkerThreadFp(void *wparam) { - SWorker * worker = wparam; - SWorkerPool *pool = worker->pool; - void * msg = NULL; - int32_t qtype = 0; - void * ahandle = NULL; - int32_t code = 0; - - setThreadName(pool->name); - - while (1) { - if (taosReadQitemFromQset(pool->qset, &qtype, (void **)&msg, &ahandle) == 0) { - uDebug("pool:%s, worker:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); - break; - } - - code = (*pool->reqFp)(ahandle, msg); - (*pool->rspFp)(ahandle, msg, qtype, code); - } - - return NULL; -} +typedef void* (*ThreadFp)(void *param); int32_t tWorkerInit(SWorkerPool *pool) { pool->qset = taosOpenQset(); pool->workers = calloc(sizeof(SWorker), pool->max); pthread_mutex_init(&pool->mutex, NULL); for (int i = 0; i < pool->max; ++i) { - SWorker *pWorker = pool->workers + i; - pWorker->id = i; - pWorker->pool = pool; + SWorker *worker = pool->workers + i; + worker->id = i; + worker->pool = pool; } uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max); @@ -58,16 +37,16 @@ int32_t tWorkerInit(SWorkerPool *pool) { void tWorkerCleanup(SWorkerPool *pool) { for (int i = 0; i < pool->max; ++i) { - SWorker *pWorker = pool->workers + i; - if(taosCheckPthreadValid(pWorker->thread)) { + SWorker *worker = pool->workers + i; + if (taosCheckPthreadValid(worker->thread)) { taosQsetThreadResume(pool->qset); } } for (int i = 0; i < pool->max; ++i) { - SWorker *pWorker = pool->workers + i; - if (taosCheckPthreadValid(pWorker->thread)) { - pthread_join(pWorker->thread, NULL); + SWorker *worker = pool->workers + i; + if (taosCheckPthreadValid(worker->thread)) { + pthread_join(worker->thread, NULL); } } @@ -78,42 +57,204 @@ void tWorkerCleanup(SWorkerPool *pool) { uInfo("worker:%s is closed", pool->name); } -void *tWorkerAllocQueue(SWorkerPool *pool, void *ahandle) { +static void *tWorkerThreadFp(SWorker *worker) { + SWorkerPool *pool = worker->pool; + + void * msg = NULL; + void * ahandle = NULL; + int32_t qtype = 0; + int32_t code = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + uDebug("worker:%s:%d is running", pool->name, worker->id); + + while (1) { + if (taosReadQitemFromQset(pool->qset, &qtype, (void **)&msg, &ahandle) == 0) { + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); + break; + } + + code = (*pool->startFp)(ahandle, msg, qtype); + (*pool->endFp)(ahandle, msg, qtype, code); + } + + return NULL; +} + +taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle) { pthread_mutex_lock(&pool->mutex); - taos_queue pQueue = taosOpenQueue(); - if (pQueue == NULL) { + taos_queue queue = taosOpenQueue(); + if (queue == NULL) { pthread_mutex_unlock(&pool->mutex); return NULL; } - taosAddIntoQset(pool->qset, pQueue, ahandle); + taosAddIntoQset(pool->qset, queue, ahandle); // spawn a thread to process queue if (pool->num < pool->max) { do { - SWorker *pWorker = pool->workers + pool->num; + SWorker *worker = pool->workers + pool->num; pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pthread_create(&pWorker->thread, &thAttr, taosWorkerThreadFp, pWorker) != 0) { - uError("workers:%s:%d failed to create thread to process since %s", pool->name, pWorker->id, strerror(errno)); + if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWorkerThreadFp, worker) != 0) { + uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); } pthread_attr_destroy(&thAttr); pool->num++; - uDebug("workers:%s:%d is launched, total:%d", pool->name, pWorker->id, pool->num); + uDebug("worker:%s:%d is launched, total:%d", pool->name, worker->id, pool->num); } while (pool->num < pool->min); } pthread_mutex_unlock(&pool->mutex); - uDebug("workers:%s, queue:%p is allocated, ahandle:%p", pool->name, pQueue, ahandle); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); - return pQueue; + return queue; } -void tWorkerFreeQueue(SWorkerPool *pool, void *pQueue) { - taosCloseQueue(pQueue); - uDebug("workers:%s, queue:%p is freed", pool->name, pQueue); +void tWorkerFreeQueue(SWorkerPool *pool, void *queue) { + taosCloseQueue(queue); + uDebug("worker:%s, queue:%p is freed", pool->name, queue); +} + +int32_t tWriteWorkerInit(SWriteWorkerPool *pool) { + pool->nextId = 0; + pool->workers = calloc(sizeof(SWriteWorker), pool->max); + if (pool->workers == NULL) return -1; + + pthread_mutex_init(&pool->mutex, NULL); + for (int32_t i = 0; i < pool->max; ++i) { + SWriteWorker *worker = pool->workers + i; + worker->id = i; + worker->qall = NULL; + worker->qset = NULL; + worker->pool = pool; + } + + uInfo("worker:%s is initialized, max:%d", pool->name, pool->max); + return 0; +} + +void tWriteWorkerCleanup(SWriteWorkerPool *pool) { + for (int32_t i = 0; i < pool->max; ++i) { + SWriteWorker *worker = pool->workers + i; + if (taosCheckPthreadValid(worker->thread)) { + if (worker->qset) taosQsetThreadResume(worker->qset); + } + } + + for (int32_t i = 0; i < pool->max; ++i) { + SWriteWorker *worker = pool->workers + i; + if (taosCheckPthreadValid(worker->thread)) { + pthread_join(worker->thread, NULL); + taosFreeQall(worker->qall); + taosCloseQset(worker->qset); + } + } + + free(pool->workers); + pthread_mutex_destroy(&pool->mutex); + + uInfo("worker:%s is closed", pool->name); +} + +static void *tWriteWorkerThreadFp(SWriteWorker *worker) { + SWriteWorkerPool *pool = worker->pool; + + void * msg = NULL; + void * ahandle = NULL; + int32_t numOfMsgs = 0; + int32_t qtype = 0; + + taosBlockSIGPIPE(); + setThreadName(pool->name); + uDebug("worker:%s:%d is running", pool->name, worker->id); + + while (1) { + numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle); + if (numOfMsgs == 0) { + uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); + break; + } + + bool fsync = false; + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(worker->qall, &qtype, (void **)&msg); + fsync = fsync | (*pool->startFp)(ahandle, msg, qtype); + } + + (*pool->syncFp)(ahandle, fsync); + + // browse all items, and process them one by one + taosResetQitems(worker->qall); + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(worker->qall, &qtype, (void **)&msg); + (*pool->endFp)(ahandle, msg, qtype); + } + } + + return NULL; +} + +taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle) { + pthread_mutex_lock(&pool->mutex); + SWriteWorker *worker = pool->workers + pool->nextId; + + taos_queue *queue = taosOpenQueue(); + if (queue == NULL) { + pthread_mutex_unlock(&pool->mutex); + return NULL; + } + + if (worker->qset == NULL) { + worker->qset = taosOpenQset(); + if (worker->qset == NULL) { + taosCloseQueue(queue); + pthread_mutex_unlock(&pool->mutex); + return NULL; + } + + taosAddIntoQset(worker->qset, queue, ahandle); + worker->qall = taosAllocateQall(); + if (worker->qall == NULL) { + taosCloseQset(worker->qset); + taosCloseQueue(queue); + pthread_mutex_unlock(&pool->mutex); + return NULL; + } + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&worker->thread, &thAttr, (ThreadFp)tWriteWorkerThreadFp, worker) != 0) { + uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); + taosFreeQall(worker->qall); + taosCloseQset(worker->qset); + taosCloseQueue(queue); + queue = NULL; + } else { + uDebug("worker:%s:%d is launched, max:%d", pool->name, worker->id, pool->max); + pool->nextId = (pool->nextId + 1) % pool->max; + } + + pthread_attr_destroy(&thAttr); + } else { + taosAddIntoQset(worker->qset, queue, ahandle); + pool->nextId = (pool->nextId + 1) % pool->max; + } + + pthread_mutex_unlock(&pool->mutex); + uDebug("worker:%s, queue:%p is allocated, ahandle:%p", pool->name, queue, ahandle); + + return queue; +} + +void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue) { + taosCloseQueue(queue); + uDebug("worker:%s, queue:%p is freed", pool->name, queue); }