rename some variables

This commit is contained in:
Shengliang Guan 2021-10-06 18:24:48 +08:00
parent 2cde37fe41
commit 4c7220e0db
19 changed files with 156 additions and 160 deletions

View File

@ -118,7 +118,7 @@ typedef struct {
int32_t mnodeGetStatistics(SMnodeStat *stat); int32_t mnodeGetStatistics(SMnodeStat *stat);
/** /**
* Get the statistical information of Mnode. * Get the auth information.
* *
* @param user, username. * @param user, username.
* @param spi, security parameter index. * @param spi, security parameter index.

View File

@ -21,21 +21,21 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnCfg { typedef struct SDnCfg {
int32_t dnodeId; int32_t dnodeId;
int32_t dropped; int32_t dropped;
char clusterId[TSDB_CLUSTER_ID_LEN]; char clusterId[TSDB_CLUSTER_ID_LEN];
char file[PATH_MAX + 20]; char file[PATH_MAX + 20];
pthread_mutex_t mutex; pthread_mutex_t mutex;
} DnCfg; } SDnCfg;
int32_t dnodeInitCfg(DnCfg **cfg); int32_t dnodeInitCfg(SDnCfg **cfg);
void dnodeCleanupCfg(DnCfg **cfg); void dnodeCleanupCfg(SDnCfg **cfg);
void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data); void dnodeUpdateCfg(SDnCfg *cfg, SDnodeCfg *data);
int32_t dnodeGetDnodeId(DnCfg *cfg); int32_t dnodeGetDnodeId(SDnCfg *cfg);
void dnodeGetClusterId(DnCfg *cfg, char *clusterId); void dnodeGetClusterId(SDnCfg *cfg, char *clusterId);
void dnodeGetCfg(DnCfg *cfg, int32_t *dnodeId, char *clusterId); void dnodeGetCfg(SDnCfg *cfg, int32_t *dnodeId, char *clusterId);
void dnodeSetDropped(DnCfg *cfg); void dnodeSetDropped(SDnCfg *cfg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -21,11 +21,11 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnCheck { typedef struct SDnCheck {
} DnCheck; } SDnCheck;
int32_t dnodeInitCheck(DnCheck **check); int32_t dnodeInitCheck(SDnCheck **check);
void dnodeCleanupCheck(DnCheck **check); void dnodeCleanupCheck(SDnCheck **check);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,19 +22,19 @@ extern "C" {
#include "hash.h" #include "hash.h"
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnEps { typedef struct SDnEps {
int32_t dnodeId; int32_t dnodeId;
int32_t dnodeNum; int32_t dnodeNum;
SDnodeEp * dnodeList; SDnodeEp * dnodeList;
SHashObj * dnodeHash; SHashObj * dnodeHash;
char file[PATH_MAX + 20]; char file[PATH_MAX + 20];
pthread_mutex_t mutex; pthread_mutex_t mutex;
} DnEps; } SDnEps;
int32_t dnodeInitEps(DnEps **eps); int32_t dnodeInitEps(SDnEps **eps);
void dnodeCleanupEps(DnEps **eps); void dnodeCleanupEps(SDnEps **eps);
void dnodeUpdateEps(DnEps *eps, SDnodeEps *data); void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data);
bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr); bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr);
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port); void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -27,32 +27,28 @@ extern "C" {
#include "tstep.h" #include "tstep.h"
#include "dnode.h" #include "dnode.h"
struct DnCfg; struct SDnCfg;
struct DnCheck; struct SDnCheck;
struct DnEps; struct SDnEps;
struct DnMnEps; struct SDnMnEps;
struct DnStatus; struct SDnStatus;
struct DnTelem; struct SDnTelem;
struct DnTrans; struct SDnTrans;
struct DnMain; struct SDnMain;
struct Mnode;
struct Vnode;
typedef struct Dnode { typedef struct SDnode {
struct SSteps * steps; struct SSteps* steps;
struct DnCfg * cfg; struct SDnCfg* cfg;
struct DnCheck * check; struct SDnCheck* check;
struct DnEps * eps; struct SDnEps* eps;
struct DnMnEps * meps; struct SDnMnEps* meps;
struct DnStatus *status; struct SDnStatus* status;
struct DnTelem * telem; struct SDnTelem* telem;
struct DnTrans * trans; struct SDnTrans* trans;
struct DnMain * main; struct SDnMain* main;
struct Mnode * mnode; } SDnode;
struct Vnode * vnode;
} Dnode;
Dnode* dnodeInst(); SDnode* dnodeInst();
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }} #define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", 255, __VA_ARGS__); }}
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }} #define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", 255, __VA_ARGS__); }}

View File

@ -27,14 +27,14 @@ typedef enum {
TD_RUN_STAT_STOPPED TD_RUN_STAT_STOPPED
} RunStat; } RunStat;
typedef struct DnMain { typedef struct SDnMain {
RunStat runStatus; RunStat runStatus;
void * dnodeTimer; void * dnodeTimer;
SStartupStep startup; SStartupStep startup;
} DnMain; } SDnMain;
int32_t dnodeInitMain(DnMain **main); int32_t dnodeInitMain(SDnMain **main);
void dnodeCleanupMain(DnMain **main); void dnodeCleanupMain(SDnMain **main);
int32_t dnodeInitStorage(); int32_t dnodeInitStorage();
void dnodeCleanupStorage(); void dnodeCleanupStorage();
void dnodeReportStartup(char *name, char *desc); void dnodeReportStartup(char *name, char *desc);

View File

@ -21,19 +21,19 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnMnEps { typedef struct SDnMnEps {
SRpcEpSet mnodeEpSet; SRpcEpSet mnodeEpSet;
SMInfos mnodeInfos; SMInfos mnodeInfos;
char file[PATH_MAX + 20]; char file[PATH_MAX + 20];
pthread_mutex_t mutex; pthread_mutex_t mutex;
} DnMnEps; } SDnMnEps;
int32_t dnodeInitMnodeEps(DnMnEps **meps); int32_t dnodeInitMnodeEps(SDnMnEps **meps);
void dnodeCleanupMnodeEps(DnMnEps **meps); void dnodeCleanupMnodeEps(SDnMnEps **meps);
void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *pMinfos); void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *pMinfos);
void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *pEpSet); void dnodeUpdateMnodeFromPeer(SDnMnEps *meps, SRpcEpSet *pEpSet);
void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet); void dnodeGetEpSetForPeer(SDnMnEps *meps, SRpcEpSet *epSet);
void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet); void dnodeGetEpSetForShell(SDnMnEps *meps, SRpcEpSet *epSet);
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -21,14 +21,14 @@ extern "C" {
#endif #endif
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct DnStatus { typedef struct SDnStatus {
void * dnodeTimer; void * dnodeTimer;
void * statusTimer; void * statusTimer;
uint32_t rebootTime; uint32_t rebootTime;
} DnStatus; } SDnStatus;
int32_t dnodeInitStatus(DnStatus **status); int32_t dnodeInitStatus(SDnStatus **status);
void dnodeCleanupStatus(DnStatus **status); void dnodeCleanupStatus(SDnStatus **status);
void dnodeProcessStatusRsp(SRpcMsg *pMsg); void dnodeProcessStatusRsp(SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -25,17 +25,17 @@ extern "C" {
* sem_timedwait is NOT implemented on MacOSX * sem_timedwait is NOT implemented on MacOSX
* thus we use pthread_mutex_t/pthread_cond_t to simulate * thus we use pthread_mutex_t/pthread_cond_t to simulate
*/ */
typedef struct DnTelem { typedef struct SDnTelem {
bool enable; bool enable;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
volatile int32_t exit; volatile int32_t exit;
pthread_t thread; pthread_t thread;
char email[TSDB_FQDN_LEN]; char email[TSDB_FQDN_LEN];
} DnTelem; } SDnTelem;
int32_t dnodeInitTelem(DnTelem **telem); int32_t dnodeInitTelem(SDnTelem **telem);
void dnodeCleanupTelem(DnTelem **telem); void dnodeCleanupTelem(SDnTelem **telem);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -23,7 +23,7 @@ extern "C" {
typedef void (*RpcMsgFp)( SRpcMsg *pMsg); typedef void (*RpcMsgFp)( SRpcMsg *pMsg);
typedef struct DnTrans { typedef struct SDnTrans {
void * serverRpc; void * serverRpc;
void * clientRpc; void * clientRpc;
void * shellRpc; void * shellRpc;
@ -31,11 +31,10 @@ typedef struct DnTrans {
int32_t submitReqNum; int32_t submitReqNum;
RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX];
RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX]; RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX];
} SDnTrans;
} DnTrans; int32_t dnodeInitTrans(SDnTrans **rans);
void dnodeCleanupTrans(SDnTrans **trans);
int32_t dnodeInitTrans(DnTrans **rans);
void dnodeCleanupTrans(DnTrans **trans);
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet); void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);

View File

@ -18,7 +18,7 @@
#include "cJSON.h" #include "cJSON.h"
#include "dnodeCfg.h" #include "dnodeCfg.h"
static int32_t dnodeReadCfg(DnCfg *cfg) { static int32_t dnodeReadCfg(SDnCfg *cfg) {
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 200; int32_t maxLen = 200;
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
@ -76,7 +76,7 @@ PARSE_CFG_OVER:
return 0; return 0;
} }
static int32_t dnodeWriteCfg(DnCfg *cfg) { static int32_t dnodeWriteCfg(SDnCfg *cfg) {
FILE *fp = fopen(cfg->file, "w"); FILE *fp = fopen(cfg->file, "w");
if (!fp) { if (!fp) {
dError("failed to write %s since %s", cfg->file, strerror(errno)); dError("failed to write %s since %s", cfg->file, strerror(errno));
@ -103,8 +103,8 @@ static int32_t dnodeWriteCfg(DnCfg *cfg) {
return 0; return 0;
} }
int32_t dnodeInitCfg(DnCfg **out) { int32_t dnodeInitCfg(SDnCfg **out) {
DnCfg* cfg = calloc(1, sizeof(DnCfg)); SDnCfg* cfg = calloc(1, sizeof(SDnCfg));
if (cfg == NULL) return -1; if (cfg == NULL) return -1;
cfg->dnodeId = 0; cfg->dnodeId = 0;
@ -127,15 +127,15 @@ int32_t dnodeInitCfg(DnCfg **out) {
return ret; return ret;
} }
void dnodeCleanupCfg(DnCfg **out) { void dnodeCleanupCfg(SDnCfg **out) {
DnCfg* cfg = *out; SDnCfg* cfg = *out;
*out = NULL; *out = NULL;
pthread_mutex_destroy(&cfg->mutex); pthread_mutex_destroy(&cfg->mutex);
free(cfg); free(cfg);
} }
void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data) { void dnodeUpdateCfg(SDnCfg *cfg, SDnodeCfg *data) {
if (cfg == NULL || cfg->dnodeId == 0) return; if (cfg == NULL || cfg->dnodeId == 0) return;
pthread_mutex_lock(&cfg->mutex); pthread_mutex_lock(&cfg->mutex);
@ -148,14 +148,14 @@ void dnodeUpdateCfg(DnCfg *cfg, SDnodeCfg *data) {
pthread_mutex_unlock(&cfg->mutex); pthread_mutex_unlock(&cfg->mutex);
} }
void dnodeSetDropped(DnCfg *cfg) { void dnodeSetDropped(SDnCfg *cfg) {
pthread_mutex_lock(&cfg->mutex); pthread_mutex_lock(&cfg->mutex);
cfg->dropped = 1; cfg->dropped = 1;
dnodeWriteCfg(cfg); dnodeWriteCfg(cfg);
pthread_mutex_unlock(&cfg->mutex); pthread_mutex_unlock(&cfg->mutex);
} }
int32_t dnodeGetDnodeId(DnCfg *cfg) { int32_t dnodeGetDnodeId(SDnCfg *cfg) {
int32_t dnodeId = 0; int32_t dnodeId = 0;
pthread_mutex_lock(&cfg->mutex); pthread_mutex_lock(&cfg->mutex);
dnodeId = cfg->dnodeId; dnodeId = cfg->dnodeId;
@ -163,13 +163,13 @@ int32_t dnodeGetDnodeId(DnCfg *cfg) {
return dnodeId; return dnodeId;
} }
void dnodeGetClusterId(DnCfg *cfg, char *clusterId) { void dnodeGetClusterId(SDnCfg *cfg, char *clusterId) {
pthread_mutex_lock(&cfg->mutex); pthread_mutex_lock(&cfg->mutex);
tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
pthread_mutex_unlock(&cfg->mutex); pthread_mutex_unlock(&cfg->mutex);
} }
void dnodeGetCfg(DnCfg *cfg, int32_t *dnodeId, char *clusterId) { void dnodeGetCfg(SDnCfg *cfg, int32_t *dnodeId, char *clusterId) {
pthread_mutex_lock(&cfg->mutex); pthread_mutex_lock(&cfg->mutex);
*dnodeId = cfg->dnodeId; *dnodeId = cfg->dnodeId;
tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); tstrncpy(clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);

View File

@ -145,8 +145,8 @@ static int32_t dnodeCheckAccess() { return 0; }
static int32_t dnodeCheckVersion() { return 0; } static int32_t dnodeCheckVersion() { return 0; }
static int32_t dnodeCheckDatafile() { return 0; } static int32_t dnodeCheckDatafile() { return 0; }
int32_t dnodeInitCheck(DnCheck **out) { int32_t dnodeInitCheck(SDnCheck **out) {
DnCheck *check = calloc(1, sizeof(DnCheck)); SDnCheck *check = calloc(1, sizeof(SDnCheck));
if (check == NULL) return -1; if (check == NULL) return -1;
*out = check; *out = check;
@ -195,8 +195,8 @@ int32_t dnodeInitCheck(DnCheck **out) {
return 0; return 0;
} }
void dnodeCleanupCheck(DnCheck **out) { void dnodeCleanupCheck(SDnCheck **out) {
DnCheck *check = *out; SDnCheck *check = *out;
*out = NULL; *out = NULL;
free(check); free(check);

View File

@ -20,7 +20,7 @@
#include "dnodeEps.h" #include "dnodeEps.h"
#include "dnodeCfg.h" #include "dnodeCfg.h"
static void dnodePrintEps(DnEps *eps) { static void dnodePrintEps(SDnEps *eps) {
dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum); dDebug("print dnodeEp, dnodeNum:%d", eps->dnodeNum);
for (int32_t i = 0; i < eps->dnodeNum; i++) { for (int32_t i = 0; i < eps->dnodeNum; i++) {
SDnodeEp *ep = &eps->dnodeList[i]; SDnodeEp *ep = &eps->dnodeList[i];
@ -28,7 +28,7 @@ static void dnodePrintEps(DnEps *eps) {
} }
} }
static void dnodeResetEps(DnEps *eps, SDnodeEps *data) { static void dnodeResetEps(SDnEps *eps, SDnodeEps *data) {
assert(data != NULL); assert(data != NULL);
if (data->dnodeNum > eps->dnodeNum) { if (data->dnodeNum > eps->dnodeNum) {
@ -48,7 +48,7 @@ static void dnodeResetEps(DnEps *eps, SDnodeEps *data) {
} }
} }
static int32_t dnodeReadEps(DnEps *eps) { static int32_t dnodeReadEps(SDnEps *eps) {
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 30000; int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
@ -145,7 +145,7 @@ PRASE_EPS_OVER:
return 0; return 0;
} }
static int32_t dnodeWriteEps(DnEps *eps) { static int32_t dnodeWriteEps(SDnEps *eps) {
FILE *fp = fopen(eps->file, "w"); FILE *fp = fopen(eps->file, "w");
if (!fp) { if (!fp) {
dError("failed to write %s since %s", eps->file, strerror(errno)); dError("failed to write %s since %s", eps->file, strerror(errno));
@ -182,8 +182,8 @@ static int32_t dnodeWriteEps(DnEps *eps) {
return 0; return 0;
} }
int32_t dnodeInitEps(DnEps **out) { int32_t dnodeInitEps(SDnEps **out) {
DnEps *eps = calloc(1, sizeof(DnEps)); SDnEps *eps = calloc(1, sizeof(SDnEps));
if (eps == NULL) return -1; if (eps == NULL) return -1;
eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); eps->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
@ -203,8 +203,8 @@ int32_t dnodeInitEps(DnEps **out) {
return ret; return ret;
} }
void dnodeCleanupEps(DnEps **out) { void dnodeCleanupEps(SDnEps **out) {
DnEps *eps = *out; SDnEps *eps = *out;
*out = NULL; *out = NULL;
pthread_mutex_lock(&eps->mutex); pthread_mutex_lock(&eps->mutex);
@ -225,7 +225,7 @@ void dnodeCleanupEps(DnEps **out) {
free(eps); free(eps);
} }
void dnodeUpdateEps(DnEps *eps, SDnodeEps *data) { void dnodeUpdateEps(SDnEps *eps, SDnodeEps *data) {
if (data == NULL || data->dnodeNum <= 0) return; if (data == NULL || data->dnodeNum <= 0) return;
data->dnodeNum = htonl(data->dnodeNum); data->dnodeNum = htonl(data->dnodeNum);
@ -250,7 +250,7 @@ void dnodeUpdateEps(DnEps *eps, SDnodeEps *data) {
pthread_mutex_unlock(&eps->mutex); pthread_mutex_unlock(&eps->mutex);
} }
bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) { bool dnodeIsDnodeEpChanged(SDnEps *eps, int32_t dnodeId, char *epstr) {
bool changed = false; bool changed = false;
pthread_mutex_lock(&eps->mutex); pthread_mutex_lock(&eps->mutex);
@ -269,7 +269,7 @@ bool dnodeIsDnodeEpChanged(DnEps *eps, int32_t dnodeId, char *epstr) {
} }
void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) { void dnodeGetDnodeEp(int32_t dnodeId, char *epstr, char *fqdn, uint16_t *port) {
DnEps *eps = dnodeInst()->eps; SDnEps *eps = dnodeInst()->eps;
pthread_mutex_lock(&eps->mutex); pthread_mutex_lock(&eps->mutex);
SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t)); SDnodeEp *ep = taosHashGet(eps->dnodeHash, &dnodeId, sizeof(int32_t));

View File

@ -33,9 +33,10 @@
#include "mnode.h" #include "mnode.h"
#include "vnode.h" #include "vnode.h"
static Dnode tsDnode = {0}; SDnode *dnodeInst() {
static SDnode inst = {0};
Dnode *dnodeInst() { return &tsDnode; } return &inst;
}
static int32_t dnodeInitVnodeModule(void **unused) { static int32_t dnodeInitVnodeModule(void **unused) {
SVnodePara para; SVnodePara para;
@ -47,7 +48,7 @@ static int32_t dnodeInitVnodeModule(void **unused) {
} }
static int32_t dnodeInitMnodeModule(void **unused) { static int32_t dnodeInitMnodeModule(void **unused) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
SMnodePara para; SMnodePara para;
para.fp.GetDnodeEp = dnodeGetDnodeEp; para.fp.GetDnodeEp = dnodeGetDnodeEp;
@ -64,7 +65,7 @@ int32_t dnodeInit() {
struct SSteps *steps = taosStepInit(24, dnodeReportStartup); struct SSteps *steps = taosStepInit(24, dnodeReportStartup);
if (steps == NULL) return -1; if (steps == NULL) return -1;
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
taosStepAdd(steps, "dnode-main", (void **)&dnode->main, (InitFp)dnodeInitMain, (CleanupFp)dnodeCleanupMain); taosStepAdd(steps, "dnode-main", (void **)&dnode->main, (InitFp)dnodeInitMain, (CleanupFp)dnodeCleanupMain);
taosStepAdd(steps, "dnode-storage", NULL, (InitFp)dnodeInitStorage, (CleanupFp)dnodeCleanupStorage); taosStepAdd(steps, "dnode-storage", NULL, (InitFp)dnodeInitStorage, (CleanupFp)dnodeCleanupStorage);
@ -96,7 +97,7 @@ int32_t dnodeInit() {
} }
void dnodeCleanup() { void dnodeCleanup() {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) { if (dnode->main->runStatus != TD_RUN_STAT_STOPPED) {
dnode->main->runStatus = TD_RUN_STAT_STOPPED; dnode->main->runStatus = TD_RUN_STAT_STOPPED;
taosStepCleanup(dnode->steps); taosStepCleanup(dnode->steps);

View File

@ -55,8 +55,8 @@ void dnodePrintDiskInfo() {
dInfo("=================================="); dInfo("==================================");
} }
int32_t dnodeInitMain(DnMain **out) { int32_t dnodeInitMain(SDnMain **out) {
DnMain* main = calloc(1, sizeof(DnMain)); SDnMain* main = calloc(1, sizeof(SDnMain));
if (main == NULL) return -1; if (main == NULL) return -1;
main->runStatus = TD_RUN_STAT_STOPPED; main->runStatus = TD_RUN_STAT_STOPPED;
@ -101,8 +101,8 @@ int32_t dnodeInitMain(DnMain **out) {
return taosCheckGlobalCfg(); return taosCheckGlobalCfg();
} }
void dnodeCleanupMain(DnMain **out) { void dnodeCleanupMain(SDnMain **out) {
DnMain *main = *out; SDnMain *main = *out;
*out = NULL; *out = NULL;
if (main->dnodeTimer != NULL) { if (main->dnodeTimer != NULL) {
@ -202,7 +202,7 @@ void dnodeCleanupStorage() {
} }
void dnodeReportStartup(char *name, char *desc) { void dnodeReportStartup(char *name, char *desc) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
if (dnode->main != NULL) { if (dnode->main != NULL) {
SStartupStep *startup = &dnode->main->startup; SStartupStep *startup = &dnode->main->startup;
tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->name, name, strlen(startup->name));
@ -212,7 +212,7 @@ void dnodeReportStartup(char *name, char *desc) {
} }
void dnodeReportStartupFinished(char *name, char *desc) { void dnodeReportStartupFinished(char *name, char *desc) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
SStartupStep *startup = &dnode->main->startup; SStartupStep *startup = &dnode->main->startup;
tstrncpy(startup->name, name, strlen(startup->name)); tstrncpy(startup->name, name, strlen(startup->name));
tstrncpy(startup->desc, desc, strlen(startup->desc)); tstrncpy(startup->desc, desc, strlen(startup->desc));
@ -222,7 +222,7 @@ void dnodeReportStartupFinished(char *name, char *desc) {
void dnodeProcessStartupReq(SRpcMsg *pMsg) { void dnodeProcessStartupReq(SRpcMsg *pMsg) {
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont); dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep)); SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep)); memcpy(pStep, &dnode->main->startup, sizeof(SStartupStep));
@ -234,7 +234,7 @@ void dnodeProcessStartupReq(SRpcMsg *pMsg) {
} }
static int32_t dnodeStartMnode(SRpcMsg *pMsg) { static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
SCreateMnodeMsg *pCfg = pMsg->pCont; SCreateMnodeMsg *pCfg = pMsg->pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId); pCfg->dnodeId = htonl(pCfg->dnodeId);
if (pCfg->dnodeId != dnode->cfg->dnodeId) { if (pCfg->dnodeId != dnode->cfg->dnodeId) {
@ -254,7 +254,7 @@ static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp); dDebug("meps index:%d, meps:%d:%s", i, pCfg->mnodes.mnodeInfos[i].mnodeId, pCfg->mnodes.mnodeInfos[i].mnodeEp);
} }
if (mnodeIsServing(dnode->mnode)) return 0; if (mnodeIsServing()) return 0;
return mnodeDeploy(&pCfg->mnodes); return mnodeDeploy(&pCfg->mnodes);
} }

View File

@ -22,7 +22,7 @@
#include "dnodeMnodeEps.h" #include "dnodeMnodeEps.h"
#include "mnode.h" #include "mnode.h"
static void dnodePrintMnodeEps(DnMnEps *meps) { static void dnodePrintMnodeEps(SDnMnEps *meps) {
SRpcEpSet *epset = &meps->mnodeEpSet; SRpcEpSet *epset = &meps->mnodeEpSet;
dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse); dInfo("print mnode eps, num:%d inuse:%d", epset->numOfEps, epset->inUse);
for (int32_t i = 0; i < epset->numOfEps; i++) { for (int32_t i = 0; i < epset->numOfEps; i++) {
@ -30,7 +30,7 @@ static void dnodePrintMnodeEps(DnMnEps *meps) {
} }
} }
static void dnodeResetMnodeEps(DnMnEps *meps, SMInfos *mInfos) { static void dnodeResetMnodeEps(SDnMnEps *meps, SMInfos *mInfos) {
if (mInfos == NULL || mInfos->mnodeNum == 0) { if (mInfos == NULL || mInfos->mnodeNum == 0) {
meps->mnodeEpSet.numOfEps = 1; meps->mnodeEpSet.numOfEps = 1;
taosGetFqdnPortFromEp(tsFirst, meps->mnodeEpSet.fqdn[0], &meps->mnodeEpSet.port[0]); taosGetFqdnPortFromEp(tsFirst, meps->mnodeEpSet.fqdn[0], &meps->mnodeEpSet.port[0]);
@ -55,7 +55,7 @@ static void dnodeResetMnodeEps(DnMnEps *meps, SMInfos *mInfos) {
dnodePrintMnodeEps(meps); dnodePrintMnodeEps(meps);
} }
static int32_t dnodeWriteMnodeEps(DnMnEps *meps) { static int32_t dnodeWriteMnodeEps(SDnMnEps *meps) {
FILE *fp = fopen(meps->file, "w"); FILE *fp = fopen(meps->file, "w");
if (!fp) { if (!fp) {
dError("failed to write %s since %s", meps->file, strerror(errno)); dError("failed to write %s since %s", meps->file, strerror(errno));
@ -91,7 +91,7 @@ static int32_t dnodeWriteMnodeEps(DnMnEps *meps) {
return 0; return 0;
} }
static int32_t dnodeReadMnodeEps(DnMnEps *meps, DnEps *deps) { static int32_t dnodeReadMnodeEps(SDnMnEps *meps, SDnEps *deps) {
int32_t len = 0; int32_t len = 0;
int32_t maxLen = 2000; int32_t maxLen = 2000;
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
@ -192,7 +192,7 @@ PARSE_MINFOS_OVER:
} }
void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
DnMnEps *meps = dnodeInst()->meps; SDnMnEps *meps = dnodeInst()->meps;
SRpcConnInfo connInfo = {0}; SRpcConnInfo connInfo = {0};
rpcGetConnInfo(rpcMsg->handle, &connInfo); rpcGetConnInfo(rpcMsg->handle, &connInfo);
@ -222,8 +222,8 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) {
rpcSendRedirectRsp(rpcMsg->handle, &epSet); rpcSendRedirectRsp(rpcMsg->handle, &epSet);
} }
int32_t dnodeInitMnodeEps(DnMnEps **out) { int32_t dnodeInitMnodeEps(SDnMnEps **out) {
DnMnEps *meps = calloc(1, sizeof(DnMnEps)); SDnMnEps *meps = calloc(1, sizeof(SDnMnEps));
if (meps == NULL) return -1; if (meps == NULL) return -1;
snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir); snprintf(meps->file, sizeof(meps->file), "%s/mnodeEpSet.json", tsDnodeDir);
@ -239,8 +239,8 @@ int32_t dnodeInitMnodeEps(DnMnEps **out) {
return ret; return ret;
} }
void dnodeCleanupMnodeEps(DnMnEps **out) { void dnodeCleanupMnodeEps(SDnMnEps **out) {
DnMnEps *meps = *out; SDnMnEps *meps = *out;
*out = NULL; *out = NULL;
if (meps != NULL) { if (meps != NULL) {
@ -249,7 +249,7 @@ void dnodeCleanupMnodeEps(DnMnEps **out) {
} }
} }
void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *mInfos) { void dnodeUpdateMnodeFromStatus(SDnMnEps *meps, SMInfos *mInfos) {
if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) { if (mInfos->mnodeNum <= 0 || mInfos->mnodeNum > TSDB_MAX_REPLICA) {
dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum); dError("invalid mInfos since num:%d invalid", mInfos->mnodeNum);
return; return;
@ -278,7 +278,7 @@ void dnodeUpdateMnodeFromStatus(DnMnEps *meps, SMInfos *mInfos) {
pthread_mutex_unlock(&meps->mutex); pthread_mutex_unlock(&meps->mutex);
} }
void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *ep) { void dnodeUpdateMnodeFromPeer(SDnMnEps *meps, SRpcEpSet *ep) {
if (ep->numOfEps <= 0) { if (ep->numOfEps <= 0) {
dError("mInfos is changed, but content is invalid, discard it"); dError("mInfos is changed, but content is invalid, discard it");
return; return;
@ -296,7 +296,7 @@ void dnodeUpdateMnodeFromPeer(DnMnEps *meps, SRpcEpSet *ep) {
pthread_mutex_unlock(&meps->mutex); pthread_mutex_unlock(&meps->mutex);
} }
void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet) { void dnodeGetEpSetForPeer(SDnMnEps *meps, SRpcEpSet *epSet) {
pthread_mutex_lock(&meps->mutex); pthread_mutex_lock(&meps->mutex);
*epSet = meps->mnodeEpSet; *epSet = meps->mnodeEpSet;
@ -307,7 +307,7 @@ void dnodeGetEpSetForPeer(DnMnEps *meps, SRpcEpSet *epSet) {
pthread_mutex_unlock(&meps->mutex); pthread_mutex_unlock(&meps->mutex);
} }
void dnodeGetEpSetForShell(DnMnEps *meps, SRpcEpSet *epSet) { void dnodeGetEpSetForShell(SDnMnEps *meps, SRpcEpSet *epSet) {
pthread_mutex_lock(&meps->mutex); pthread_mutex_lock(&meps->mutex);
*epSet = meps->mnodeEpSet; *epSet = meps->mnodeEpSet;

View File

@ -26,7 +26,7 @@
#include "vnode.h" #include "vnode.h"
static void dnodeSendStatusMsg(void *handle, void *tmrId) { static void dnodeSendStatusMsg(void *handle, void *tmrId) {
DnStatus *status = handle; SDnStatus *status = handle;
if (status->dnodeTimer == NULL) { if (status->dnodeTimer == NULL) {
dError("dnode timer is already released"); dError("dnode timer is already released");
return; return;
@ -46,7 +46,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
return; return;
} }
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId); dnodeGetCfg(dnode->cfg, &pStatus->dnodeId, pStatus->clusterId);
pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg)); pStatus->dnodeId = htonl(dnodeGetDnodeId(dnode->cfg));
pStatus->version = htonl(tsVersion); pStatus->version = htonl(tsVersion);
@ -86,8 +86,8 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
} }
void dnodeProcessStatusRsp(SRpcMsg *pMsg) { void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
DnStatus *status = pMsg->ahandle; SDnStatus *status = pMsg->ahandle;
if (pMsg->code != TSDB_CODE_SUCCESS) { if (pMsg->code != TSDB_CODE_SUCCESS) {
dError("status rsp is received, error:%s", tstrerror(pMsg->code)); dError("status rsp is received, error:%s", tstrerror(pMsg->code));
@ -123,8 +123,8 @@ void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer); taosTmrReset(dnodeSendStatusMsg, tsStatusInterval * 1000, status, status->dnodeTimer, &status->statusTimer);
} }
int32_t dnodeInitStatus(DnStatus **out) { int32_t dnodeInitStatus(SDnStatus **out) {
DnStatus *status = calloc(1, sizeof(DnStatus)); SDnStatus *status = calloc(1, sizeof(SDnStatus));
if (status == NULL) return -1; if (status == NULL) return -1;
status->statusTimer = NULL; status->statusTimer = NULL;
status->dnodeTimer = dnodeInst()->main->dnodeTimer; status->dnodeTimer = dnodeInst()->main->dnodeTimer;
@ -135,8 +135,8 @@ int32_t dnodeInitStatus(DnStatus **out) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void dnodeCleanupStatus(DnStatus **out) { void dnodeCleanupStatus(SDnStatus **out) {
DnStatus *status = *out; SDnStatus *status = *out;
*out = NULL; *out = NULL;
if (status->statusTimer != NULL) { if (status->statusTimer != NULL) {

View File

@ -154,14 +154,14 @@ static void dnodeAddMemoryInfo(SBufferWriter* bw) {
fclose(fp); fclose(fp);
} }
static void dnodeAddVersionInfo(DnTelem* telem, SBufferWriter* bw) { static void dnodeAddVersionInfo(SDnTelem* telem, SBufferWriter* bw) {
dnodeAddStringField(bw, "version", version); dnodeAddStringField(bw, "version", version);
dnodeAddStringField(bw, "buildInfo", buildinfo); dnodeAddStringField(bw, "buildInfo", buildinfo);
dnodeAddStringField(bw, "gitInfo", gitinfo); dnodeAddStringField(bw, "gitInfo", gitinfo);
dnodeAddStringField(bw, "email", telem->email); dnodeAddStringField(bw, "email", telem->email);
} }
static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) { static void dnodeAddRuntimeInfo(SDnTelem* telem, SBufferWriter* bw) {
SMnodeStat stat = {0}; SMnodeStat stat = {0};
if (mnodeGetStatistics(&stat) != 0) { if (mnodeGetStatistics(&stat) != 0) {
return; return;
@ -179,7 +179,7 @@ static void dnodeAddRuntimeInfo(DnTelem* telem, SBufferWriter* bw) {
dnodeAddIntField(bw, "compStorage", stat.compStorage); dnodeAddIntField(bw, "compStorage", stat.compStorage);
} }
static void dnodeSendTelemetryReport(DnTelem* telem) { static void dnodeSendTelemetryReport(SDnTelem* telem) {
char buf[128] = {0}; char buf[128] = {0};
uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER); uint32_t ip = taosGetIpv4FromFqdn(TELEMETRY_SERVER);
if (ip == 0xffffffff) { if (ip == 0xffffffff) {
@ -192,7 +192,7 @@ static void dnodeSendTelemetryReport(DnTelem* telem) {
return; return;
} }
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
SBufferWriter bw = tbufInitWriter(NULL, false); SBufferWriter bw = tbufInitWriter(NULL, false);
dnodeBeginObject(&bw); dnodeBeginObject(&bw);
dnodeAddStringField(&bw, "instanceId", dnode->cfg->clusterId); dnodeAddStringField(&bw, "instanceId", dnode->cfg->clusterId);
@ -227,7 +227,7 @@ static void dnodeSendTelemetryReport(DnTelem* telem) {
} }
static void* dnodeTelemThreadFp(void* param) { static void* dnodeTelemThreadFp(void* param) {
DnTelem* telem = param; SDnTelem* telem = param;
struct timespec end = {0}; struct timespec end = {0};
clock_gettime(CLOCK_REALTIME, &end); clock_gettime(CLOCK_REALTIME, &end);
@ -253,7 +253,7 @@ static void* dnodeTelemThreadFp(void* param) {
return NULL; return NULL;
} }
static void dnodeGetEmail(DnTelem* telem, char* filepath) { static void dnodeGetEmail(SDnTelem* telem, char* filepath) {
int32_t fd = taosOpenFileRead(filepath); int32_t fd = taosOpenFileRead(filepath);
if (fd < 0) { if (fd < 0) {
return; return;
@ -266,8 +266,8 @@ static void dnodeGetEmail(DnTelem* telem, char* filepath) {
taosCloseFile(fd); taosCloseFile(fd);
} }
int32_t dnodeInitTelem(DnTelem** out) { int32_t dnodeInitTelem(SDnTelem** out) {
DnTelem* telem = calloc(1, sizeof(DnTelem)); SDnTelem* telem = calloc(1, sizeof(SDnTelem));
if (telem == NULL) return -1; if (telem == NULL) return -1;
telem->enable = tsEnableTelemetryReporting; telem->enable = tsEnableTelemetryReporting;
@ -296,8 +296,8 @@ int32_t dnodeInitTelem(DnTelem** out) {
return 0; return 0;
} }
void dnodeCleanupTelem(DnTelem** out) { void dnodeCleanupTelem(SDnTelem** out) {
DnTelem* telem = *out; SDnTelem* telem = *out;
*out = NULL; *out = NULL;
if (!telem->enable) { if (!telem->enable) {

View File

@ -14,7 +14,7 @@
*/ */
/* this file is mainly responsible for the communication between DNODEs. Each /* this file is mainly responsible for the communication between DNODEs. Each
* dnode works as both server and client. Dnode may send status, grant, config * dnode works as both server and client. SDnode may send status, grant, config
* messages to mnode, mnode may send create/alter/drop table/vnode messages * messages to mnode, mnode may send create/alter/drop table/vnode messages
* to dnode. All theses messages are handled from here * to dnode. All theses messages are handled from here
*/ */
@ -30,7 +30,7 @@
#include "mnode.h" #include "mnode.h"
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode * dnode = dnodeInst(); SDnode * dnode = dnodeInst();
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
@ -64,7 +64,7 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
} }
} }
int32_t dnodeInitServer(DnTrans *trans) { int32_t dnodeInitServer(SDnTrans *trans) {
trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg;
@ -106,7 +106,7 @@ int32_t dnodeInitServer(DnTrans *trans) {
return 0; return 0;
} }
void dnodeCleanupServer(DnTrans *trans) { void dnodeCleanupServer(SDnTrans *trans) {
if (trans->serverRpc) { if (trans->serverRpc) {
rpcClose(trans->serverRpc); rpcClose(trans->serverRpc);
trans->serverRpc = NULL; trans->serverRpc = NULL;
@ -115,7 +115,7 @@ void dnodeCleanupServer(DnTrans *trans) {
} }
static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) { if (dnode->main->runStatus == TD_RUN_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return; if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("msg:%p is ignored since dnode is stopping", pMsg); dTrace("msg:%p is ignored since dnode is stopping", pMsg);
@ -141,7 +141,7 @@ static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
int32_t dnodeInitClient(DnTrans *trans) { int32_t dnodeInitClient(SDnTrans *trans) {
trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; trans->peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; trans->peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg;
trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; trans->peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg;
@ -186,7 +186,7 @@ int32_t dnodeInitClient(DnTrans *trans) {
return 0; return 0;
} }
void dnodeCleanupClient(DnTrans *trans) { void dnodeCleanupClient(SDnTrans *trans) {
if (trans->clientRpc) { if (trans->clientRpc) {
rpcClose(trans->clientRpc); rpcClose(trans->clientRpc);
trans->clientRpc = NULL; trans->clientRpc = NULL;
@ -195,7 +195,7 @@ void dnodeCleanupClient(DnTrans *trans) {
} }
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
Dnode * dnode = dnodeInst(); SDnode * dnode = dnodeInst();
SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
if (pMsg->pCont == NULL) return; if (pMsg->pCont == NULL) return;
@ -213,7 +213,7 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return; return;
} }
DnTrans *trans = dnode->trans; SDnTrans *trans = dnode->trans;
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&trans->queryReqNum, 1); atomic_fetch_add_32(&trans->queryReqNum, 1);
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) {
@ -247,26 +247,26 @@ static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secr
} }
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL); rpcSendRequest(dnode->trans->clientRpc, epSet, rpcMsg, NULL);
} }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
Dnode * dnode = dnodeInst(); SDnode * dnode = dnodeInst();
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(dnode->meps, &epSet); dnodeGetEpSetForPeer(dnode->meps, &epSet);
dnodeSendMsgToDnode(&epSet, rpcMsg); dnodeSendMsgToDnode(&epSet, rpcMsg);
} }
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
Dnode * dnode = dnodeInst(); SDnode * dnode = dnodeInst();
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(dnode->meps, &epSet); dnodeGetEpSetForPeer(dnode->meps, &epSet);
rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(dnode->trans->clientRpc, &epSet, rpcMsg, rpcRsp);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) { void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
Dnode *dnode = dnodeInst(); SDnode *dnode = dnodeInst();
rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp); rpcSendRecv(dnode->trans->clientRpc, epSet, rpcMsg, rpcRsp);
} }
@ -303,7 +303,7 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
return rpcRsp.code; return rpcRsp.code;
} }
int32_t dnodeInitShell(DnTrans *trans) { int32_t dnodeInitShell(SDnTrans *trans) {
trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; trans->shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
trans->shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; trans->shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
trans->shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; trans->shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
@ -376,15 +376,15 @@ int32_t dnodeInitShell(DnTrans *trans) {
return 0; return 0;
} }
void dnodeCleanupShell(DnTrans *trans) { void dnodeCleanupShell(SDnTrans *trans) {
if (trans->shellRpc) { if (trans->shellRpc) {
rpcClose(trans->shellRpc); rpcClose(trans->shellRpc);
trans->shellRpc = NULL; trans->shellRpc = NULL;
} }
} }
int32_t dnodeInitTrans(DnTrans **out) { int32_t dnodeInitTrans(SDnTrans **out) {
DnTrans *trans = calloc(1, sizeof(DnTrans)); SDnTrans *trans = calloc(1, sizeof(SDnTrans));
if (trans == NULL) return -1; if (trans == NULL) return -1;
*out = trans; *out = trans;
@ -404,8 +404,8 @@ int32_t dnodeInitTrans(DnTrans **out) {
return 0; return 0;
} }
void dnodeCleanupTrans(DnTrans **out) { void dnodeCleanupTrans(SDnTrans **out) {
DnTrans* trans = *out; SDnTrans* trans = *out;
*out = NULL; *out = NULL;
dnodeCleanupShell(trans); dnodeCleanupShell(trans);