commit
ca3a0b3e02
|
@ -54,4 +54,4 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void bmInitMsgHandles(SMgmtWrapper *pWrapper) {}
|
||||
void bmInitMsgHandle(SMgmtWrapper *pWrapper) {}
|
||||
|
|
|
@ -117,7 +117,7 @@ void bmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
|||
mgmtFp.dropMsgFp = bmProcessDropReq;
|
||||
mgmtFp.requiredFp = bmRequire;
|
||||
|
||||
bmInitMsgHandles(pWrapper);
|
||||
bmInitMsgHandle(pWrapper);
|
||||
pWrapper->name = "bnode";
|
||||
pWrapper->fp = mgmtFp;
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ static int32_t dmProcessDropNodeMsg(SDnode *pDnode, ENodeType ntype, SNodeMsg *p
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
|
||||
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg) {
|
||||
switch (pMsg->rpcMsg.msgType) {
|
||||
case TDMT_DND_CREATE_MNODE:
|
||||
return dmProcessCreateNodeMsg(pDnode, MNODE, pMsg);
|
||||
|
@ -195,37 +195,21 @@ int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
||||
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
|
||||
}
|
||||
|
||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("startup req is received");
|
||||
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
|
||||
dndGetStartup(pDnode, pStartup);
|
||||
|
||||
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
|
||||
void dmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmPutMsgToStatusWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmPutMsgToMgmtWorker, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessStatusMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dndInt.h"
|
||||
|
||||
static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
|
||||
static int32_t dmGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
|
||||
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
||||
pInfo->logdir.size = tsLogSpace.size;
|
||||
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
||||
|
@ -30,14 +30,14 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
||||
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
||||
pInfo->protocol = 1;
|
||||
pInfo->dnode_id = pDnode->dnodeId;
|
||||
pInfo->cluster_id = pDnode->clusterId;
|
||||
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
|
||||
}
|
||||
|
||||
static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
||||
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
||||
pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f);
|
||||
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system);
|
||||
taosGetCpuCores(&pInfo->cpu_cores);
|
||||
|
@ -63,7 +63,7 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
void dndSendMonitorReport(SDnode *pDnode) {
|
||||
void dmSendMonitorReport(SDnode *pDnode) {
|
||||
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
||||
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
|
||||
|
||||
|
@ -71,7 +71,7 @@ void dndSendMonitorReport(SDnode *pDnode) {
|
|||
if (pMonitor == NULL) return;
|
||||
|
||||
SMonBasicInfo basicInfo = {0};
|
||||
dndGetMonitorBasicInfo(pDnode, &basicInfo);
|
||||
dmGetMonitorBasicInfo(pDnode, &basicInfo);
|
||||
monSetBasicInfo(pMonitor, &basicInfo);
|
||||
|
||||
SMonClusterInfo clusterInfo = {0};
|
||||
|
@ -89,11 +89,11 @@ void dndSendMonitorReport(SDnode *pDnode) {
|
|||
}
|
||||
|
||||
SMonDnodeInfo dnodeInfo = {0};
|
||||
dndGetMonitorDnodeInfo(pDnode, &dnodeInfo);
|
||||
dmGetMonitorDnodeInfo(pDnode, &dnodeInfo);
|
||||
monSetDnodeInfo(pMonitor, &dnodeInfo);
|
||||
|
||||
SMonDiskInfo diskInfo = {0};
|
||||
if (dndGetMonitorDiskInfo(pDnode, &diskInfo) == 0) {
|
||||
if (dmGetMonitorDiskInfo(pDnode, &diskInfo) == 0) {
|
||||
monSetDiskInfo(pMonitor, &diskInfo);
|
||||
}
|
||||
|
|
@ -40,7 +40,7 @@ static void *dmThreadRoutine(void *param) {
|
|||
|
||||
float monitorInterval = (curTime - lastMonitorTime) / 1000.0f;
|
||||
if (monitorInterval >= tsMonitorInterval) {
|
||||
dndSendMonitorReport(pDnode);
|
||||
dmSendMonitorReport(pDnode);
|
||||
lastMonitorTime = curTime;
|
||||
}
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
|||
code = dmProcessGrantRsp(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
code = dmProcessCDnodeMsg(pMgmt->pDnode, pMsg);
|
||||
code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -122,7 +122,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
|
|||
dDebug("dnode workers are closed");
|
||||
}
|
||||
|
||||
int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
|
||||
|
||||
|
@ -131,7 +131,7 @@ int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||
SSingleWorker *pWorker = &pMgmt->statusWorker;
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
#include "tconfig.h"
|
||||
|
||||
static struct {
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef _TD_DND_BNODE_INT_H_
|
||||
#define _TD_DND_BNODE_INT_H_
|
||||
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
#include "bnode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -35,8 +35,8 @@ typedef struct SBnodeMgmt {
|
|||
int32_t bmOpen(SMgmtWrapper *pWrapper);
|
||||
int32_t bmDrop(SMgmtWrapper *pWrapper);
|
||||
|
||||
// bmMsg.c
|
||||
void bmInitMsgHandles(SMgmtWrapper *pWrapper);
|
||||
// bmHandle.c
|
||||
void bmInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef _TD_DND_DNODE_INT_H_
|
||||
#define _TD_DND_DNODE_INT_H_
|
||||
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -39,12 +39,6 @@ typedef struct SDnodeMgmt {
|
|||
SMgmtWrapper *pWrapper;
|
||||
} SDnodeMgmt;
|
||||
|
||||
// dmInt.c
|
||||
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
|
||||
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
|
||||
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
|
||||
|
||||
// dmFile.c
|
||||
int32_t dmReadFile(SDnodeMgmt *pMgmt);
|
||||
int32_t dmWriteFile(SDnodeMgmt *pMgmt);
|
||||
|
@ -57,14 +51,17 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
|||
int32_t dmProcessStatusRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||
int32_t dmProcessCDnodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
||||
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg);
|
||||
|
||||
// dmMonitor.c
|
||||
void dmSendMonitorReport(SDnode *pDnode);
|
||||
|
||||
// dmWorker.c
|
||||
int32_t dmStartThread(SDnodeMgmt *pMgmt);
|
||||
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
|
||||
void dmStopWorker(SDnodeMgmt *pMgmt);
|
||||
int32_t dmPutMsgToMgmtWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t dmPutMsgToStatusWorker(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1,230 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DND_H_
|
||||
#define _TD_DND_H_
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "cJSON.h"
|
||||
#include "tcache.h"
|
||||
#include "tcrc32c.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "thash.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tlog.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tprocess.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "tthread.h"
|
||||
#include "ttime.h"
|
||||
#include "tworker.h"
|
||||
|
||||
#include "dnode.h"
|
||||
#include "monitor.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define dFatal(...) \
|
||||
{ \
|
||||
if (dDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define dError(...) \
|
||||
{ \
|
||||
if (dDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define dWarn(...) \
|
||||
{ \
|
||||
if (dDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define dInfo(...) \
|
||||
{ \
|
||||
if (dDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define dDebug(...) \
|
||||
{ \
|
||||
if (dDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
#define dTrace(...) \
|
||||
{ \
|
||||
if (dDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
|
||||
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
|
||||
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
|
||||
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
|
||||
|
||||
typedef struct SMgmtFp SMgmtFp;
|
||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||
typedef struct SMsgHandle SMsgHandle;
|
||||
typedef struct SDnodeMgmt SDnodeMgmt;
|
||||
typedef struct SVnodesMgmt SVnodesMgmt;
|
||||
typedef struct SMnodeMgmt SMnodeMgmt;
|
||||
typedef struct SQnodeMgmt SQnodeMgmt;
|
||||
typedef struct SSnodeMgmt SSnodeMgmt;
|
||||
typedef struct SBnodeMgmt SBnodeMgmt;
|
||||
|
||||
typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
|
||||
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
|
||||
typedef int32_t (*StartNodeFp)(SMgmtWrapper *pWrapper);
|
||||
typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
|
||||
|
||||
typedef struct SMsgHandle {
|
||||
SMgmtWrapper *pQndWrapper;
|
||||
SMgmtWrapper *pMndWrapper;
|
||||
SMgmtWrapper *pWrapper;
|
||||
} SMsgHandle;
|
||||
|
||||
typedef struct SMgmtFp {
|
||||
OpenNodeFp openFp;
|
||||
CloseNodeFp closeFp;
|
||||
StartNodeFp startFp;
|
||||
CreateNodeFp createMsgFp;
|
||||
DropNodeFp dropMsgFp;
|
||||
RequireNodeFp requiredFp;
|
||||
} SMgmtFp;
|
||||
|
||||
typedef struct SMgmtWrapper {
|
||||
const char *name;
|
||||
char *path;
|
||||
int32_t refCount;
|
||||
SRWLatch latch;
|
||||
ENodeType ntype;
|
||||
bool deployed;
|
||||
bool required;
|
||||
EProcType procType;
|
||||
int32_t procId;
|
||||
SProcObj *pProc;
|
||||
SShm shm;
|
||||
void *pMgmt;
|
||||
SDnode *pDnode;
|
||||
SMgmtFp fp;
|
||||
int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
|
||||
NodeMsgFp msgFps[TDMT_MAX];
|
||||
} SMgmtWrapper;
|
||||
|
||||
typedef struct {
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
SMsgHandle msgHandles[TDMT_MAX];
|
||||
} STransMgmt;
|
||||
|
||||
typedef struct SDnode {
|
||||
int64_t clusterId;
|
||||
int32_t dnodeId;
|
||||
int32_t numOfSupportVnodes;
|
||||
int64_t rebootTime;
|
||||
char *localEp;
|
||||
char *localFqdn;
|
||||
char *firstEp;
|
||||
char *secondEp;
|
||||
char *dataDir;
|
||||
SDiskCfg *disks;
|
||||
int32_t numOfDisks;
|
||||
uint16_t serverPort;
|
||||
bool dropped;
|
||||
ENodeType ntype;
|
||||
EDndStatus status;
|
||||
EDndEvent event;
|
||||
SStartupReq startup;
|
||||
TdFilePtr lockfile;
|
||||
STransMgmt trans;
|
||||
SMgmtWrapper wrappers[NODE_MAX];
|
||||
} SDnode;
|
||||
|
||||
// dndFile.h
|
||||
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
|
||||
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
|
||||
|
||||
// dndInt.h
|
||||
EDndStatus dndGetStatus(SDnode *pDnode);
|
||||
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
|
||||
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
|
||||
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
|
||||
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
|
||||
|
||||
// dndMonitor.h
|
||||
void dndSendMonitorReport(SDnode *pDnode);
|
||||
|
||||
// dndStr.h
|
||||
const char *dndStatStr(EDndStatus stat);
|
||||
const char *dndNodeLogStr(ENodeType ntype);
|
||||
const char *dndNodeProcStr(ENodeType ntype);
|
||||
const char *dndEventStr(EDndEvent ev);
|
||||
|
||||
// dndTransport.h
|
||||
int32_t dndInitTrans(SDnode *pDnode);
|
||||
void dndCleanupTrans(SDnode *pDnode);
|
||||
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
|
||||
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
|
||||
int32_t dndInitMsgHandle(SDnode *pDnode);
|
||||
|
||||
// mgmt
|
||||
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void bmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void qmSetMgmtFp(SMgmtWrapper *pMgmt);
|
||||
void smSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void vmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void mmSetMgmtFp(SMgmtWrapper *pMgmt);
|
||||
|
||||
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
|
||||
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
|
||||
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
|
||||
|
||||
typedef struct {
|
||||
int32_t openVnodes;
|
||||
int32_t totalVnodes;
|
||||
int32_t masterNum;
|
||||
int64_t numOfSelectReqs;
|
||||
int64_t numOfInsertReqs;
|
||||
int64_t numOfInsertSuccessReqs;
|
||||
int64_t numOfBatchInsertReqs;
|
||||
int64_t numOfBatchInsertSuccessReqs;
|
||||
} SVnodesStat;
|
||||
|
||||
void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
|
||||
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
|
||||
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
|
||||
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DND_H_*/
|
|
@ -16,34 +16,189 @@
|
|||
#ifndef _TD_DND_INT_H_
|
||||
#define _TD_DND_INT_H_
|
||||
|
||||
#include "dnd.h"
|
||||
#include "os.h"
|
||||
|
||||
#include "cJSON.h"
|
||||
#include "tcache.h"
|
||||
#include "tcrc32c.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "thash.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tlog.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tprocess.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "tthread.h"
|
||||
#include "ttime.h"
|
||||
#include "tworker.h"
|
||||
|
||||
#include "dnode.h"
|
||||
#include "monitor.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// dndEnv.h
|
||||
int32_t dndInit();
|
||||
void dndCleanup();
|
||||
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
// dndExec.h
|
||||
typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
|
||||
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
|
||||
typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStatus;
|
||||
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
|
||||
|
||||
typedef struct SMgmtFp SMgmtFp;
|
||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||
typedef struct SMsgHandle SMsgHandle;
|
||||
typedef struct SDnodeMgmt SDnodeMgmt;
|
||||
typedef struct SVnodesMgmt SVnodesMgmt;
|
||||
typedef struct SMnodeMgmt SMnodeMgmt;
|
||||
typedef struct SQnodeMgmt SQnodeMgmt;
|
||||
typedef struct SSnodeMgmt SSnodeMgmt;
|
||||
typedef struct SBnodeMgmt SBnodeMgmt;
|
||||
|
||||
typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
|
||||
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
|
||||
typedef int32_t (*StartNodeFp)(SMgmtWrapper *pWrapper);
|
||||
typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
|
||||
|
||||
typedef struct SMsgHandle {
|
||||
SMgmtWrapper *pQndWrapper;
|
||||
SMgmtWrapper *pMndWrapper;
|
||||
SMgmtWrapper *pWrapper;
|
||||
} SMsgHandle;
|
||||
|
||||
typedef struct SMgmtFp {
|
||||
OpenNodeFp openFp;
|
||||
CloseNodeFp closeFp;
|
||||
StartNodeFp startFp;
|
||||
CreateNodeFp createMsgFp;
|
||||
DropNodeFp dropMsgFp;
|
||||
RequireNodeFp requiredFp;
|
||||
} SMgmtFp;
|
||||
|
||||
typedef struct SMgmtWrapper {
|
||||
const char *name;
|
||||
char *path;
|
||||
int32_t refCount;
|
||||
SRWLatch latch;
|
||||
ENodeType ntype;
|
||||
bool deployed;
|
||||
bool required;
|
||||
EProcType procType;
|
||||
int32_t procId;
|
||||
SProcObj *pProc;
|
||||
SShm shm;
|
||||
void *pMgmt;
|
||||
SDnode *pDnode;
|
||||
SMgmtFp fp;
|
||||
int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
|
||||
NodeMsgFp msgFps[TDMT_MAX];
|
||||
} SMgmtWrapper;
|
||||
|
||||
typedef struct {
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
SMsgHandle msgHandles[TDMT_MAX];
|
||||
} STransMgmt;
|
||||
|
||||
typedef struct SDnode {
|
||||
int64_t clusterId;
|
||||
int32_t dnodeId;
|
||||
int32_t numOfSupportVnodes;
|
||||
int64_t rebootTime;
|
||||
char *localEp;
|
||||
char *localFqdn;
|
||||
char *firstEp;
|
||||
char *secondEp;
|
||||
char *dataDir;
|
||||
SDiskCfg *disks;
|
||||
int32_t numOfDisks;
|
||||
uint16_t serverPort;
|
||||
bool dropped;
|
||||
ENodeType ntype;
|
||||
EDndStatus status;
|
||||
EDndEvent event;
|
||||
SStartupReq startup;
|
||||
TdFilePtr lockfile;
|
||||
STransMgmt trans;
|
||||
SMgmtWrapper wrappers[NODE_MAX];
|
||||
} SDnode;
|
||||
|
||||
// dndEnv.c
|
||||
const char *dndStatStr(EDndStatus stat);
|
||||
const char *dndNodeLogStr(ENodeType ntype);
|
||||
const char *dndNodeProcStr(ENodeType ntype);
|
||||
const char *dndEventStr(EDndEvent ev);
|
||||
|
||||
// dndExec.c
|
||||
int32_t dndOpenNode(SMgmtWrapper *pWrapper);
|
||||
void dndCloseNode(SMgmtWrapper *pWrapper);
|
||||
int32_t dndRun(SDnode *pDnode);
|
||||
|
||||
// dndInt.c
|
||||
SDnode *dndCreate(const SDnodeOpt *pOption);
|
||||
void dndClose(SDnode *pDnode);
|
||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
|
||||
|
||||
// dndMsg.c
|
||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
// dndFile.c
|
||||
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
|
||||
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed);
|
||||
TdFilePtr dndCheckRunning(const char *dataDir);
|
||||
int32_t dndReadShmFile(SDnode *pDnode);
|
||||
int32_t dndWriteShmFile(SDnode *pDnode);
|
||||
|
||||
// dndInt.c
|
||||
EDndStatus dndGetStatus(SDnode *pDnode);
|
||||
void dndSetStatus(SDnode *pDnode, EDndStatus stat);
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
|
||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType);
|
||||
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper);
|
||||
void dndReleaseWrapper(SMgmtWrapper *pWrapper);
|
||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event);
|
||||
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
|
||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
|
||||
// dndTransport.c
|
||||
int32_t dndInitTrans(SDnode *pDnode);
|
||||
void dndCleanupTrans(SDnode *pDnode);
|
||||
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
|
||||
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
|
||||
int32_t dndInitMsgHandle(SDnode *pDnode);
|
||||
|
||||
// mgmt
|
||||
void dmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void bmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void qmSetMgmtFp(SMgmtWrapper *pMgmt);
|
||||
void smSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void vmSetMgmtFp(SMgmtWrapper *pWrapper);
|
||||
void mmSetMgmtFp(SMgmtWrapper *pMgmt);
|
||||
|
||||
void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
|
||||
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
|
||||
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
|
||||
|
||||
typedef struct {
|
||||
int32_t openVnodes;
|
||||
int32_t totalVnodes;
|
||||
int32_t masterNum;
|
||||
int64_t numOfSelectReqs;
|
||||
int64_t numOfInsertReqs;
|
||||
int64_t numOfInsertSuccessReqs;
|
||||
int64_t numOfBatchInsertReqs;
|
||||
int64_t numOfBatchInsertSuccessReqs;
|
||||
} SVnodesStat;
|
||||
|
||||
void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
|
||||
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
|
||||
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
|
||||
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef _TD_DND_MNODE_INT_H_
|
||||
#define _TD_DND_MNODE_INT_H_
|
||||
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
#include "mnode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -46,8 +46,8 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq);
|
|||
int32_t mmDrop(SMgmtWrapper *pWrapper);
|
||||
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
|
||||
|
||||
// mmMsg.c
|
||||
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
|
||||
// mmHandle.c
|
||||
void mmInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef _TD_DND_QNODE_INT_H_
|
||||
#define _TD_DND_QNODE_INT_H_
|
||||
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
#include "qnode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -36,8 +36,8 @@ typedef struct SQnodeMgmt {
|
|||
int32_t qmOpen(SMgmtWrapper *pWrapper);
|
||||
int32_t qmDrop(SMgmtWrapper *pWrapper);
|
||||
|
||||
// qmMsg.c
|
||||
void qmInitMsgHandles(SMgmtWrapper *pWrapper);
|
||||
// qmHandle.c
|
||||
void qmInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef _TD_DND_SNODE_INT_H_
|
||||
#define _TD_DND_SNODE_INT_H_
|
||||
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
#include "snode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -38,8 +38,8 @@ typedef struct SSnodeMgmt {
|
|||
int32_t smOpen(SMgmtWrapper *pWrapper);
|
||||
int32_t smDrop(SMgmtWrapper *pWrapper);
|
||||
|
||||
// smMsg.c
|
||||
void smInitMsgHandles(SMgmtWrapper *pWrapper);
|
||||
// smHandle.c
|
||||
void smInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#define _TD_DND_VNODES_INT_H_
|
||||
|
||||
#include "sync.h"
|
||||
#include "dnd.h"
|
||||
#include "dndInt.h"
|
||||
#include "vnode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -84,8 +84,8 @@ void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
|||
int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
|
||||
void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||
|
||||
// vmMsg.c
|
||||
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
|
||||
// vmHandle.c
|
||||
void vmInitMsgHandle(SMgmtWrapper *pWrapper);
|
||||
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
||||
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
||||
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
|
||||
|
|
|
@ -211,3 +211,19 @@ void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
|||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||
pStartup->finished = 0;
|
||||
}
|
||||
|
||||
static void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
||||
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
|
||||
}
|
||||
|
||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("startup req is received");
|
||||
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
|
||||
dndGetStartup(pDnode, pStartup);
|
||||
|
||||
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
|
@ -73,7 +73,7 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
|
|
|
@ -236,7 +236,7 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
|||
mgmtFp.dropMsgFp = mmProcessDropReq;
|
||||
mgmtFp.requiredFp = mmRequire;
|
||||
|
||||
mmInitMsgHandles(pWrapper);
|
||||
mmInitMsgHandle(pWrapper);
|
||||
pWrapper->name = "mnode";
|
||||
pWrapper->fp = mgmtFp;
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE);
|
||||
|
|
|
@ -120,7 +120,7 @@ void qmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
|||
mgmtFp.dropMsgFp = qmProcessDropReq;
|
||||
mgmtFp.requiredFp = qmRequire;
|
||||
|
||||
qmInitMsgHandles(pWrapper);
|
||||
qmInitMsgHandle(pWrapper);
|
||||
pWrapper->name = "qnode";
|
||||
pWrapper->fp = mgmtFp;
|
||||
}
|
||||
|
|
|
@ -54,7 +54,7 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
void smInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by SNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE);
|
||||
|
|
|
@ -117,7 +117,7 @@ void smSetMgmtFp(SMgmtWrapper *pWrapper) {
|
|||
mgmtFp.dropMsgFp = smProcessDropReq;
|
||||
mgmtFp.requiredFp = smRequire;
|
||||
|
||||
smInitMsgHandles(pWrapper);
|
||||
smInitMsgHandle(pWrapper);
|
||||
pWrapper->name = "snode";
|
||||
pWrapper->fp = mgmtFp;
|
||||
}
|
||||
|
|
|
@ -238,7 +238,7 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
|
||||
|
|
|
@ -339,7 +339,7 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
|
|||
mgmtFp.closeFp = vmCleanup;
|
||||
mgmtFp.requiredFp = vmRequire;
|
||||
|
||||
vmInitMsgHandles(pWrapper);
|
||||
vmInitMsgHandle(pWrapper);
|
||||
pWrapper->name = "vnode";
|
||||
pWrapper->fp = mgmtFp;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue