refactor: make more object global
This commit is contained in:
parent
36fe62fbd2
commit
46d566d345
|
@ -66,6 +66,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct SMgmtWrapper {
|
typedef struct SMgmtWrapper {
|
||||||
SMgmtFunc func;
|
SMgmtFunc func;
|
||||||
|
struct SDnode *pDnode;
|
||||||
void *pMgmt;
|
void *pMgmt;
|
||||||
const char *name;
|
const char *name;
|
||||||
char *path;
|
char *path;
|
||||||
|
@ -125,10 +126,7 @@ typedef struct SDnode {
|
||||||
|
|
||||||
// dmEnv.c
|
// dmEnv.c
|
||||||
SDnode *dmInstance();
|
SDnode *dmInstance();
|
||||||
bool dmNotRunning();
|
|
||||||
void dmReportStartup(const char *pName, const char *pDesc);
|
void dmReportStartup(const char *pName, const char *pDesc);
|
||||||
void *dmGetClientRpc();
|
|
||||||
void dmGetMnodeEpSetGlobal(SEpSet *pEpSet);
|
|
||||||
|
|
||||||
// dmMgmt.c
|
// dmMgmt.c
|
||||||
int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype);
|
int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype);
|
||||||
|
@ -164,7 +162,7 @@ int32_t dmInitServer(SDnode *pDnode);
|
||||||
void dmCleanupServer(SDnode *pDnode);
|
void dmCleanupServer(SDnode *pDnode);
|
||||||
int32_t dmInitClient(SDnode *pDnode);
|
int32_t dmInitClient(SDnode *pDnode);
|
||||||
void dmCleanupClient(SDnode *pDnode);
|
void dmCleanupClient(SDnode *pDnode);
|
||||||
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
|
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode);
|
int32_t dmInitMsgHandle(SDnode *pDnode);
|
||||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,9 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
|
|
||||||
SDnode global = {0};
|
static SDnode global = {0};
|
||||||
|
|
||||||
|
SDnode *dmInstance() { return &global; }
|
||||||
|
|
||||||
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
|
static int32_t dmCheckRepeatInit(SDnode *pDnode) {
|
||||||
if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
|
if (atomic_val_compare_exchange_8(&pDnode->once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
|
||||||
|
@ -49,10 +51,10 @@ static int32_t dmInitMonitor() {
|
||||||
|
|
||||||
int32_t dmInit(int8_t rtype) {
|
int32_t dmInit(int8_t rtype) {
|
||||||
dInfo("start to init env");
|
dInfo("start to init env");
|
||||||
if (dmCheckRepeatInit(&global) != 0) return -1;
|
if (dmCheckRepeatInit(dmInstance()) != 0) return -1;
|
||||||
if (dmInitSystem() != 0) return -1;
|
if (dmInitSystem() != 0) return -1;
|
||||||
if (dmInitMonitor() != 0) return -1;
|
if (dmInitMonitor() != 0) return -1;
|
||||||
if (dmInitDnode(&global, rtype) != 0) return -1;
|
if (dmInitDnode(dmInstance(), rtype) != 0) return -1;
|
||||||
|
|
||||||
dInfo("env is initialized");
|
dInfo("env is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -69,7 +71,7 @@ static int32_t dmCheckRepeatCleanup(SDnode *pDnode) {
|
||||||
void dmCleanup() {
|
void dmCleanup() {
|
||||||
dDebug("start to cleanup env");
|
dDebug("start to cleanup env");
|
||||||
if (dmCheckRepeatCleanup != 0) return;
|
if (dmCheckRepeatCleanup != 0) return;
|
||||||
dmCleanupDnode(&global);
|
dmCleanupDnode(dmInstance());
|
||||||
monCleanup();
|
monCleanup();
|
||||||
syncCleanUp();
|
syncCleanUp();
|
||||||
walCleanUp();
|
walCleanUp();
|
||||||
|
@ -83,17 +85,17 @@ void dmCleanup() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmStop() {
|
void dmStop() {
|
||||||
SDnode *pDnode = &global;
|
SDnode *pDnode = dmInstance();
|
||||||
pDnode->stop = true;
|
pDnode->stop = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmRun() {
|
int32_t dmRun() {
|
||||||
SDnode *pDnode = &global;
|
SDnode *pDnode = dmInstance();
|
||||||
return dmRunDnode(pDnode);
|
return dmRunDnode(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
SDnode *pDnode = &global;
|
SDnode *pDnode = dmInstance();
|
||||||
|
|
||||||
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
|
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
|
||||||
if (pWrapper != NULL) {
|
if (pWrapper != NULL) {
|
||||||
|
@ -130,7 +132,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
SDnode *pDnode = &global;
|
SDnode *pDnode = dmInstance();
|
||||||
|
|
||||||
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
|
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, ntype);
|
||||||
if (pWrapper == NULL) {
|
if (pWrapper == NULL) {
|
||||||
|
@ -161,37 +163,27 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool dmIsNodeRequired(EDndNodeType ntype) {
|
static bool dmIsNodeRequired(EDndNodeType ntype) {
|
||||||
SDnode *pDnode = &global;
|
SDnode *pDnode = dmInstance();
|
||||||
return pDnode->wrappers[ntype].required;
|
return pDnode->wrappers[ntype].required;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||||
SDnode *pDnode = dmInstance();
|
|
||||||
|
|
||||||
SMgmtInputOpt opt = {
|
SMgmtInputOpt opt = {
|
||||||
.path = pWrapper->path,
|
.path = pWrapper->path,
|
||||||
.name = pWrapper->name,
|
.name = pWrapper->name,
|
||||||
.pData = &pDnode->data,
|
.pData = &pWrapper->pDnode->data,
|
||||||
.processCreateNodeFp = dmProcessCreateNodeReq,
|
.processCreateNodeFp = dmProcessCreateNodeReq,
|
||||||
.processDropNodeFp = dmProcessDropNodeReq,
|
.processDropNodeFp = dmProcessDropNodeReq,
|
||||||
.isNodeRequiredFp = dmIsNodeRequired,
|
.isNodeRequiredFp = dmIsNodeRequired,
|
||||||
};
|
};
|
||||||
|
|
||||||
opt.msgCb = dmGetMsgcb(pWrapper);
|
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
||||||
return opt;
|
return opt;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmReportStartup(const char *pName, const char *pDesc) {
|
void dmReportStartup(const char *pName, const char *pDesc) {
|
||||||
SStartupInfo *pStartup = &global.startup;
|
SStartupInfo *pStartup = &(dmInstance()->startup);
|
||||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||||
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
|
dDebug("step:%s, %s", pStartup->name, pStartup->desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
SDnode *dmInstance() { return &global; }
|
|
||||||
|
|
||||||
bool dmNotRunning() { return global.status != DND_STAT_RUNNING; }
|
|
||||||
|
|
||||||
void *dmGetClientRpc() { return global.trans.clientRpc; }
|
|
||||||
|
|
||||||
void dmGetMnodeEpSetGlobal(SEpSet *pEpSet) { dmGetMnodeEpSet(&global.data, pEpSet); }
|
|
|
@ -132,6 +132,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
|
||||||
|
|
||||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
|
pWrapper->pDnode = pDnode;
|
||||||
pWrapper->name = dmNodeName(ntype);
|
pWrapper->name = dmNodeName(ntype);
|
||||||
pWrapper->ntype = ntype;
|
pWrapper->ntype = ntype;
|
||||||
pWrapper->proc.wrapper = pWrapper;
|
pWrapper->proc.wrapper = pWrapper;
|
||||||
|
|
|
@ -64,7 +64,7 @@ static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
SDnode *pDnode = dmInstance();
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
|
|
||||||
if (taosMkDir(pWrapper->path) != 0) {
|
if (taosMkDir(pWrapper->path) != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
|
@ -195,8 +195,9 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
|
static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) {
|
||||||
|
SDnode *pDnode = dmInstance();
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
dmGetMnodeEpSetGlobal(&epSet);
|
dmGetMnodeEpSet(&pDnode->data, &epSet);
|
||||||
|
|
||||||
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
|
dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse);
|
||||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||||
|
@ -279,7 +280,6 @@ static inline void dmSendRedirectRsp(const SRpcMsg *pRsp, const SEpSet *pNewEpSe
|
||||||
|
|
||||||
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||||
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
|
||||||
|
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper->proc.ptype)) {
|
||||||
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
|
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_REGIST);
|
||||||
} else {
|
} else {
|
||||||
|
@ -289,22 +289,15 @@ static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
SMgmtWrapper *pWrapper = pHandle->wrapper;
|
SMgmtWrapper *pWrapper = pHandle->wrapper;
|
||||||
|
|
||||||
if (InChildProc(pWrapper->proc.ptype)) {
|
if (InChildProc(pWrapper->proc.ptype)) {
|
||||||
SRpcMsg msg = {.info = *pHandle, .code = type};
|
SRpcMsg msg = {.code = type, .info = *pHandle};
|
||||||
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
|
dmPutToProcPQueue(&pWrapper->proc, &msg, sizeof(SRpcMsg), NULL, 0, DND_FUNC_RELEASE);
|
||||||
} else {
|
} else {
|
||||||
rpcReleaseHandle(pHandle->handle, type);
|
rpcReleaseHandle(pHandle->handle, type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool rpcRfp(int32_t code) {
|
static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; }
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dmInitClient(SDnode *pDnode) {
|
int32_t dmInitClient(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
@ -345,8 +338,7 @@ void dmCleanupClient(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
|
static inline int32_t dmGetHideUserAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
||||||
char *ckey) {
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
char pass[TSDB_PASSWORD_LEN + 1] = {0};
|
||||||
|
|
||||||
|
@ -370,7 +362,7 @@ static inline int32_t dmGetHideUserAuth(SDnode *pDnode, char *user, char *spi, c
|
||||||
|
|
||||||
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
|
static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret,
|
||||||
char *ckey) {
|
char *ckey) {
|
||||||
if (dmGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
|
if (dmGetHideUserAuth(user, spi, encrypt, secret, ckey) == 0) {
|
||||||
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
|
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -410,7 +402,6 @@ int32_t dmInitServer(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
SRpcInit rpcInit = {0};
|
SRpcInit rpcInit = {0};
|
||||||
|
|
||||||
strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
|
strncpy(rpcInit.localFqdn, tsLocalFqdn, strlen(tsLocalFqdn));
|
||||||
rpcInit.localPort = tsServerPort;
|
rpcInit.localPort = tsServerPort;
|
||||||
rpcInit.label = "DND";
|
rpcInit.label = "DND";
|
||||||
|
@ -441,10 +432,9 @@ void dmCleanupServer(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
|
SMsgCb dmGetMsgcb(SDnode *pDnode) {
|
||||||
SDnode *pDnode = dmInstance();
|
|
||||||
SMsgCb msgCb = {
|
SMsgCb msgCb = {
|
||||||
.clientRpc = dmInstance()->trans.clientRpc,
|
.clientRpc = pDnode->trans.clientRpc,
|
||||||
.sendReqFp = dmSendReq,
|
.sendReqFp = dmSendReq,
|
||||||
.sendRspFp = dmSendRsp,
|
.sendRspFp = dmSendRsp,
|
||||||
.sendRedirectRspFp = dmSendRedirectRsp,
|
.sendRedirectRspFp = dmSendRedirectRsp,
|
||||||
|
|
Loading…
Reference in New Issue