commit
a811391a59
|
@ -40,87 +40,112 @@ enum {
|
|||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_TABLE, "create-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TABLE, "drop-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TABLE, "alter-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLE_META, "table-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TABLES_META, "tables-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY0, "dummy0" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" )
|
||||
|
||||
// message from mnode to dnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_TABLE, "drop-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_TABLE, "alter-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_VNODE, "create-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_VNODE, "drop-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_DROP_STABLE, "drop-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_STREAM, "alter-stream" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY4, "dummy4" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY5, "dummy5" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
|
||||
|
||||
|
||||
// message from client to mnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_ACCT, "alter-acct" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_ACCT, "drop-acct" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_USER, "create-user" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_USER, "alter-user" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TABLE, "create-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE, "retrieve" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_QUERY, "kill-query" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
|
||||
|
||||
// message from mnode to dnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE_IN, "create-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE_IN, "alter-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE_IN, "drop-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_VNODE_IN, "create-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_VNODE_IN, "alter-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_VNODE_IN, "drop-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_VNODE_IN, "sync-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE_IN, "compact-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_MNODE_IN, "create-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_MNODE_IN, "drop-mnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE_IN, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
|
||||
|
||||
// message from dnode to mnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_TABLE, "config-table" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_CONFIG_VNODE, "config-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY12, "dummy12" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY13, "dummy13" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY14, "dummy14" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY15, "dummy15" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY16, "dummy16" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY17, "dummy17" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY18, "dummy18" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY19, "dummy19" )
|
||||
|
||||
// message for topic
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" )
|
||||
//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" )
|
||||
|
||||
// message from client to mnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_ACCT, "alter-acct" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_ACCT, "drop-acct" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_USER, "create-user" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_USER, "alter-user" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_USER, "drop-user" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DNODE, "create-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONFIG_DNODE, "config-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DNODE, "drop-dnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_DB, "create-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_DB, "drop-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_USE_DB, "use-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_DB, "alter-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SYNC_DB, "sync-db" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_TOPIC, "create-topic" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TOPIC, "drop-topic" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_TOPIC, "alter-topic" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_FUNCTION, "create-function" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_FUNCTION, "alter-function" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STABLE, "create-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE, "retrieve" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC, "retrieve-func" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_COMPACT_VNODE, "compact-vnode" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY20, "dummy20" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY21, "dummy21" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY22, "dummy22" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY23, "dummy23" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY24, "dummy24" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY25, "dummy25" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY26, "dummy26" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY27, "dummy27" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY28, "dummy28" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY29, "dummy29" )
|
||||
|
||||
// message from dnode to mnode
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_STATUS, "status" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_GRANT, "grant" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DM_AUTH, "auth" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY30, "dummy30" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY31, "dummy31" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY32, "dummy32" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY33, "dummy33" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY34, "dummy34" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY35, "dummy35" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY36, "dummy36" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY37, "dummy37" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY38, "dummy38" )
|
||||
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY39, "dummy39" )
|
||||
|
||||
#ifndef TAOS_MESSAGE_C
|
||||
TSDB_MSG_TYPE_MAX // 147
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DNODE_H_
|
||||
#define _TD_DNODE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
struct SRpcEpSet;
|
||||
struct SRpcMsg;
|
||||
/**
|
||||
* Initialize and start the dnode module.
|
||||
*
|
||||
* @return Error code.
|
||||
*/
|
||||
int32_t dnodeInit();
|
||||
|
||||
/**
|
||||
* Stop and cleanup dnode module.
|
||||
*/
|
||||
void dnodeCleanup();
|
||||
|
||||
/**
|
||||
* Send messages to other dnodes, such as create vnode message.
|
||||
*
|
||||
* @param epSet, the endpoint list of the dnodes.
|
||||
* @param rpcMsg, message to be sent.
|
||||
*/
|
||||
void dnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
|
||||
/**
|
||||
* Send messages to mnode, such as config message.
|
||||
*
|
||||
* @param rpcMsg, message to be sent.
|
||||
*/
|
||||
void dnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
|
||||
|
||||
/**
|
||||
* Send redirect message to dnode or shell.
|
||||
*
|
||||
* @param rpcMsg, message to be sent.
|
||||
* @param forShell, used to identify whether to send to shell or dnode.
|
||||
*/
|
||||
void dnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
|
||||
|
||||
/**
|
||||
* Get the corresponding endpoint information from dnodeId.
|
||||
*
|
||||
* @param dnodeId, the id ot dnode.
|
||||
* @param ep, the endpoint of dnode.
|
||||
* @param fqdn, the fqdn of dnode.
|
||||
* @param port, the port of dnode.
|
||||
*/
|
||||
void dnodeGetEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
|
||||
|
||||
/**
|
||||
* Report the startup progress.
|
||||
*/
|
||||
void dnodeReportStartup(char *name, char *desc);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DNODE_H_*/
|
|
@ -16,93 +16,111 @@
|
|||
#ifndef _TD_VNODE_H_
|
||||
#define _TD_VNODE_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "trpc.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
/**
|
||||
* Send messages to other dnodes, such as create vnode message.
|
||||
*
|
||||
* @param epSet, the endpoint list of dnodes.
|
||||
* @param rpcMsg, message to be sent.
|
||||
*/
|
||||
void (*SendMsgToDnode)(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
|
||||
|
||||
/**
|
||||
* Send messages to mnode, such as config message.
|
||||
*
|
||||
* @param rpcMsg, message to be sent.
|
||||
*/
|
||||
void (*SendMsgToMnode)(struct SRpcMsg *rpcMsg);
|
||||
|
||||
/**
|
||||
* Get the corresponding endpoint information from dnodeId.
|
||||
*
|
||||
* @param dnodeId, the id ot dnode.
|
||||
* @param ep, the endpoint of dnode.
|
||||
* @param fqdn, the fqdn of dnode.
|
||||
* @param port, the port of dnode.
|
||||
*/
|
||||
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
|
||||
|
||||
/**
|
||||
* Report the startup progress.
|
||||
*/
|
||||
void (*ReportStartup)(char *name, char *desc);
|
||||
|
||||
} SVnodeFp;
|
||||
typedef struct SVnode SVnode;
|
||||
|
||||
typedef struct {
|
||||
SVnodeFp fp;
|
||||
} SVnodePara;
|
||||
char dbName[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
|
||||
int32_t cacheBlockSize; // MB
|
||||
int32_t totalBlocks;
|
||||
int32_t daysPerFile;
|
||||
int32_t daysToKeep0;
|
||||
int32_t daysToKeep1;
|
||||
int32_t daysToKeep2;
|
||||
int32_t minRowsPerFileBlock;
|
||||
int32_t maxRowsPerFileBlock;
|
||||
int8_t precision; // time resolution
|
||||
int8_t compression;
|
||||
int8_t cacheLastRow;
|
||||
int8_t update;
|
||||
int8_t quorum;
|
||||
int8_t replica;
|
||||
int8_t walLevel;
|
||||
int32_t fsyncPeriod; // millisecond
|
||||
SVnodeDesc replicas[TSDB_MAX_REPLICA];
|
||||
} SVnodeCfg;
|
||||
|
||||
typedef struct {
|
||||
int64_t totalStorage;
|
||||
int64_t compStorage;
|
||||
int64_t pointsWritten;
|
||||
int64_t tablesNum;
|
||||
} SVnodeStatisic;
|
||||
|
||||
typedef struct {
|
||||
int8_t syncRole;
|
||||
} SVnodeStatus;
|
||||
|
||||
typedef struct {
|
||||
int32_t accessState;
|
||||
} SVnodeAccess;
|
||||
|
||||
typedef struct SVnodeMsg {
|
||||
int32_t msgType;
|
||||
int32_t code;
|
||||
SRpcMsg rpcMsg; // original message from rpc
|
||||
int32_t contLen;
|
||||
char pCont[];
|
||||
} SVnodeMsg;
|
||||
|
||||
/**
|
||||
* Start initialize vnode module.
|
||||
*
|
||||
* @param para, initialization parameters.
|
||||
* @return Error code.
|
||||
*/
|
||||
int32_t vnodeInit(SVnodePara para);
|
||||
int32_t vnodeInit();
|
||||
|
||||
/**
|
||||
* Cleanup vnode module.
|
||||
*/
|
||||
void vnodeCleanup();
|
||||
|
||||
typedef struct {
|
||||
int32_t unused;
|
||||
} SVnodeStat;
|
||||
|
||||
/**
|
||||
* Get the statistical information of vnode.
|
||||
*
|
||||
* @param stat, statistical information.
|
||||
* @param pVnode,
|
||||
* @param pStat, statistical information.
|
||||
* @return Error Code.
|
||||
*/
|
||||
int32_t vnodeGetStatistics(SVnodeStat *stat);
|
||||
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat);
|
||||
|
||||
/**
|
||||
* Get the status of all vnodes.
|
||||
*
|
||||
* @param status, status msg.
|
||||
* @param pVnode,
|
||||
* @param status, status information.
|
||||
* @return Error Code.
|
||||
*/
|
||||
void vnodeGetStatus(struct SStatusMsg *status);
|
||||
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus);
|
||||
|
||||
/**
|
||||
* Set access permissions for all vnodes.
|
||||
* Operation functions of vnode
|
||||
*
|
||||
* @param access, access permissions of vnodes.
|
||||
* @param numOfVnodes, the size of vnodes.
|
||||
* @return Error Code.
|
||||
*/
|
||||
void vnodeSetAccess(struct SVgroupAccess *access, int32_t numOfVnodes);
|
||||
SVnode *vnodeOpen(int32_t vgId, const char *path);
|
||||
void vnodeClose(SVnode *pVnode);
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
|
||||
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg);
|
||||
int32_t vnodeDrop(SVnode *pVnode);
|
||||
int32_t vnodeCompact(SVnode *pVnode);
|
||||
int32_t vnodeSync(SVnode *pVnode);
|
||||
|
||||
/**
|
||||
* Interface for processing messages.
|
||||
*
|
||||
* @param msg, message to be processed.
|
||||
* @param pVnode,
|
||||
* @param pMsg, message to be processed.
|
||||
*
|
||||
*/
|
||||
void vnodeProcessMsg(SRpcMsg *msg);
|
||||
int32_t vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -37,21 +37,24 @@ shall be used to set up the protection.
|
|||
|
||||
*/
|
||||
|
||||
typedef void* taos_queue;
|
||||
typedef void* taos_qset;
|
||||
typedef void* taos_qall;
|
||||
typedef void *taos_queue;
|
||||
typedef void *taos_qset;
|
||||
typedef void *taos_qall;
|
||||
typedef void *(*FProcessItem)(void *pItem, void *ahandle);
|
||||
typedef void *(*FProcessItems)(taos_qall qall, int numOfItems, void *ahandle);
|
||||
|
||||
taos_queue taosOpenQueue();
|
||||
void taosCloseQueue(taos_queue);
|
||||
void taosSetQueueFp(taos_queue, FProcessItem, FProcessItems);
|
||||
void *taosAllocateQitem(int size);
|
||||
void taosFreeQitem(void *item);
|
||||
int taosWriteQitem(taos_queue, int type, void *item);
|
||||
int taosReadQitem(taos_queue, int *type, void **pitem);
|
||||
void taosFreeQitem(void *pItem);
|
||||
int taosWriteQitem(taos_queue, void *pItem);
|
||||
int taosReadQitem(taos_queue, void **pItem);
|
||||
|
||||
taos_qall taosAllocateQall();
|
||||
void taosFreeQall(taos_qall);
|
||||
int taosReadAllQitems(taos_queue, taos_qall);
|
||||
int taosGetQitem(taos_qall, int *type, void **pitem);
|
||||
int taosGetQitem(taos_qall, void **pItem);
|
||||
void taosResetQitems(taos_qall);
|
||||
|
||||
taos_qset taosOpenQset();
|
||||
|
@ -61,8 +64,8 @@ int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
|
|||
void taosRemoveFromQset(taos_qset, taos_queue);
|
||||
int taosGetQueueNumber(taos_qset);
|
||||
|
||||
int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
|
||||
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
|
||||
int taosReadQitemFromQset(taos_qset, void **pItem, void **ahandle, FProcessItem *);
|
||||
int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **ahandle, FProcessItems *);
|
||||
|
||||
int taosGetQueueItemsNumber(taos_queue param);
|
||||
int taosGetQsetItemsNumber(taos_qset param);
|
||||
|
|
|
@ -22,13 +22,6 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef int32_t (*ProcessStartFp)(void *ahandle, void *pMsg, int32_t qtype);
|
||||
typedef void (*ProcessEndFp)(void *ahandle, void *pMsg, int32_t qtype, int32_t code);
|
||||
|
||||
typedef bool (*ProcessWriteStartFp)(void *ahandle, void *pMsg, int32_t qtype);
|
||||
typedef void (*ProcessWriteSyncFp)(void *ahandle, int32_t code);
|
||||
typedef void (*ProcessWriteEndFp)(void *ahandle, void *pMsg, int32_t qtype);
|
||||
|
||||
typedef struct SWorker {
|
||||
int32_t id; // worker ID
|
||||
pthread_t thread; // thread
|
||||
|
@ -40,41 +33,36 @@ typedef struct SWorkerPool {
|
|||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
taos_qset qset;
|
||||
const char * name;
|
||||
ProcessStartFp startFp;
|
||||
ProcessEndFp endFp;
|
||||
SWorker * workers;
|
||||
const char *name;
|
||||
SWorker *workers;
|
||||
pthread_mutex_t mutex;
|
||||
} SWorkerPool;
|
||||
|
||||
typedef struct SWriteWorker {
|
||||
int32_t id; // worker id
|
||||
pthread_t thread; // thread
|
||||
taos_qall qall;
|
||||
taos_qset qset; // queue set
|
||||
struct SWriteWorkerPool *pool;
|
||||
} SWriteWorker;
|
||||
typedef struct SMWorker {
|
||||
int32_t id; // worker id
|
||||
pthread_t thread; // thread
|
||||
taos_qall qall;
|
||||
taos_qset qset; // queue set
|
||||
struct SMWorkerPool *pool;
|
||||
} SMWorker;
|
||||
|
||||
typedef struct SWriteWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
const char * name;
|
||||
ProcessWriteStartFp startFp;
|
||||
ProcessWriteSyncFp syncFp;
|
||||
ProcessWriteEndFp endFp;
|
||||
SWriteWorker * workers;
|
||||
pthread_mutex_t mutex;
|
||||
} SWriteWorkerPool;
|
||||
typedef struct SMWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
const char *name;
|
||||
SMWorker *workers;
|
||||
pthread_mutex_t mutex;
|
||||
} SMWorkerPool;
|
||||
|
||||
int32_t tWorkerInit(SWorkerPool *pool);
|
||||
void tWorkerCleanup(SWorkerPool *pool);
|
||||
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle);
|
||||
taos_queue tWorkerAllocQueue(SWorkerPool *pool, void *ahandle, FProcessItem fp);
|
||||
void tWorkerFreeQueue(SWorkerPool *pool, taos_queue queue);
|
||||
|
||||
int32_t tWriteWorkerInit(SWriteWorkerPool *pool);
|
||||
void tWriteWorkerCleanup(SWriteWorkerPool *pool);
|
||||
taos_queue tWriteWorkerAllocQueue(SWriteWorkerPool *pool, void *ahandle);
|
||||
void tWriteWorkerFreeQueue(SWriteWorkerPool *pool, taos_queue queue);
|
||||
int32_t tMWorkerInit(SMWorkerPool *pool);
|
||||
void tMWorkerCleanup(SMWorkerPool *pool);
|
||||
taos_queue tMWorkerAllocQueue(SMWorkerPool *pool, void *ahandle, FProcessItems fp);
|
||||
void tMWorkerFreeQueue(SMWorkerPool *pool, taos_queue queue);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -3,4 +3,4 @@ add_subdirectory(util)
|
|||
add_subdirectory(common)
|
||||
add_subdirectory(libs)
|
||||
add_subdirectory(client)
|
||||
add_subdirectory(server)
|
||||
add_subdirectory(dnode)
|
|
@ -0,0 +1,4 @@
|
|||
add_subdirectory(mnode)
|
||||
add_subdirectory(vnode)
|
||||
add_subdirectory(qnode)
|
||||
add_subdirectory(mgmt)
|
|
@ -1,7 +1,7 @@
|
|||
aux_source_directory(src DNODE_SRC)
|
||||
add_library(dnode ${DNODE_SRC})
|
||||
add_executable(taosd ${DNODE_SRC})
|
||||
target_link_libraries(
|
||||
dnode
|
||||
taosd
|
||||
PUBLIC cjson
|
||||
PUBLIC mnode
|
||||
PUBLIC vnode
|
||||
|
@ -10,7 +10,7 @@ target_link_libraries(
|
|||
PUBLIC taos
|
||||
)
|
||||
target_include_directories(
|
||||
dnode
|
||||
taosd
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/server/dnode"
|
||||
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
)
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DNODE_EPS_H_
|
||||
#define _TD_DNODE_EPS_H_
|
||||
#ifndef _TD_DNODE_CONFIG_H_
|
||||
#define _TD_DNODE_CONFIG_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -39,4 +39,4 @@ void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DNODE_EPS_H_*/
|
||||
#endif /*_TD_DNODE_CONFIG_H_*/
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DNODE_DNODE_H_
|
||||
#define _TD_DNODE_DNODE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dnodeInt.h"
|
||||
|
||||
int32_t dnodeInitMsg();
|
||||
void dnodeCleanupMsg();
|
||||
void dnodeProcessStatusRsp(SRpcMsg *pMsg);
|
||||
void dnodeProcessStartupReq(SRpcMsg *pMsg);
|
||||
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DNODE_DNODE_H_*/
|
|
@ -25,7 +25,6 @@ extern "C" {
|
|||
#include "tlog.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "dnode.h"
|
||||
|
||||
extern int32_t dDebugFlag;
|
||||
|
||||
|
@ -38,6 +37,9 @@ extern int32_t dDebugFlag;
|
|||
|
||||
typedef enum { DN_RUN_STAT_INIT, DN_RUN_STAT_RUNNING, DN_RUN_STAT_STOPPED } EDnStat;
|
||||
|
||||
int32_t dnodeInit();
|
||||
void dnodeCleanup();
|
||||
|
||||
EDnStat dnodeGetRunStat();
|
||||
void dnodeSetRunStat();
|
||||
void dnodeGetStartup(SStartupStep *);
|
|
@ -0,0 +1,30 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DNODE_MNODE_H_
|
||||
#define _TD_DNODE_MNODE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "dnodeInt.h"
|
||||
|
||||
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DNODE_MNODE_H_*/
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DNODE_TRANS_H_
|
||||
#define _TD_DNODE_TRANS_H_
|
||||
#ifndef _TD_DNODE_TRANSPORT_H_
|
||||
#define _TD_DNODE_TRANSPORT_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -30,4 +30,4 @@ void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DNODE_TRANS_H_*/
|
||||
#endif /*_TD_DNODE_TRANSPORT_H_*/
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DNODE_STATUS_H_
|
||||
#define _TD_DNODE_STATUS_H_
|
||||
#ifndef _TD_DNODE_VNODES_H_
|
||||
#define _TD_DNODE_VNODES_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -32,4 +32,4 @@ void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DNODE_STATUS_H_*/
|
||||
#endif /*_TD_DNODE_VNODES_H_*/
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnodeEps.h"
|
||||
#include "dnodeConfig.h"
|
||||
#include "cJSON.h"
|
||||
#include "thash.h"
|
||||
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnodeDnode.h"
|
||||
#include "dnodeConfig.h"
|
||||
#include "mnode.h"
|
||||
#include "tthread.h"
|
||||
#include "ttime.h"
|
||||
#include "vnode.h"
|
||||
|
||||
static struct {
|
||||
pthread_t *threadId;
|
||||
bool stop;
|
||||
uint32_t rebootTime;
|
||||
} tsMsg;
|
||||
|
||||
static void dnodeSendStatusMsg() {
|
||||
int32_t contLen = sizeof(SStatusMsg) + TSDB_MAX_VNODES * sizeof(SVnodeLoad);
|
||||
SStatusMsg *pStatus = rpcMallocCont(contLen);
|
||||
if (pStatus == NULL) {
|
||||
dError("failed to malloc status message");
|
||||
return;
|
||||
}
|
||||
|
||||
pStatus->version = htonl(tsVersion);
|
||||
pStatus->dnodeId = htonl(dnodeGetDnodeId());
|
||||
tstrncpy(pStatus->dnodeEp, tsLocalEp, TSDB_EP_LEN);
|
||||
pStatus->clusterId = htobe64(dnodeGetClusterId());
|
||||
pStatus->lastReboot = htonl(tsMsg.rebootTime);
|
||||
pStatus->numOfCores = htonl(tsNumOfCores);
|
||||
pStatus->diskAvailable = tsAvailDataDirGB;
|
||||
|
||||
// fill cluster cfg parameters
|
||||
pStatus->clusterCfg.statusInterval = htonl(tsStatusInterval);
|
||||
pStatus->clusterCfg.checkTime = 0;
|
||||
tstrncpy(pStatus->clusterCfg.timezone, tsTimezone, 64);
|
||||
char timestr[32] = "1970-01-01 00:00:00.00";
|
||||
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
||||
tstrncpy(pStatus->clusterCfg.locale, tsLocale, TSDB_LOCALE_LEN);
|
||||
tstrncpy(pStatus->clusterCfg.charset, tsCharset, TSDB_LOCALE_LEN);
|
||||
|
||||
// vnodeGetStatus(NULL, pStatus);
|
||||
// contLen = sizeof(SStatusMsg) + pStatus->openVnodes * sizeof(SVnodeLoad);
|
||||
// pStatus->openVnodes = htons(pStatus->openVnodes);
|
||||
|
||||
SRpcMsg rpcMsg = {.ahandle = NULL, .pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_DM_STATUS};
|
||||
|
||||
dnodeSendMsgToMnode(&rpcMsg);
|
||||
}
|
||||
|
||||
void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
|
||||
dTrace("status rsp is received, code:%s", tstrerror(pMsg->code));
|
||||
if (pMsg->code != TSDB_CODE_SUCCESS) return;
|
||||
|
||||
SStatusRsp *pStatusRsp = pMsg->pCont;
|
||||
|
||||
SDnodeCfg *pCfg = &pStatusRsp->dnodeCfg;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->clusterId = htobe64(pCfg->clusterId);
|
||||
pCfg->numOfVnodes = htonl(pCfg->numOfVnodes);
|
||||
pCfg->numOfDnodes = htonl(pCfg->numOfDnodes);
|
||||
dnodeUpdateCfg(pCfg);
|
||||
|
||||
if (pCfg->dropped) {
|
||||
dError("status rsp is received, and set dnode to drop status");
|
||||
return;
|
||||
}
|
||||
|
||||
// vnodeSetAccess(pStatusRsp->vgAccess, pCfg->numOfVnodes);
|
||||
|
||||
SDnodeEps *eps = (SDnodeEps *)((char *)pStatusRsp->vgAccess + pCfg->numOfVnodes * sizeof(SVgroupAccess));
|
||||
eps->dnodeNum = htonl(eps->dnodeNum);
|
||||
for (int32_t i = 0; i < eps->dnodeNum; ++i) {
|
||||
eps->dnodeEps[i].dnodeId = htonl(eps->dnodeEps[i].dnodeId);
|
||||
eps->dnodeEps[i].dnodePort = htons(eps->dnodeEps[i].dnodePort);
|
||||
}
|
||||
|
||||
dnodeUpdateDnodeEps(eps);
|
||||
}
|
||||
|
||||
static void *dnodeThreadRoutine(void *param) {
|
||||
int32_t ms = tsStatusInterval * 1000;
|
||||
while (!tsMsg.stop) {
|
||||
taosMsleep(ms);
|
||||
dnodeSendStatusMsg();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dnodeInitMsg() {
|
||||
tsMsg.stop = false;
|
||||
tsMsg.rebootTime = taosGetTimestampSec();
|
||||
tsMsg.threadId = taosCreateThread(dnodeThreadRoutine, NULL);
|
||||
if (tsMsg.threadId == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("dnode msg is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupMsg() {
|
||||
if (tsMsg.threadId != NULL) {
|
||||
tsMsg.stop = true;
|
||||
taosDestoryThread(tsMsg.threadId);
|
||||
tsMsg.threadId = NULL;
|
||||
}
|
||||
|
||||
dInfo("dnode msg is cleanuped");
|
||||
}
|
||||
|
||||
static int32_t dnodeStartMnode(SRpcMsg *pMsg) {
|
||||
SCreateMnodeMsg *pCfg = pMsg->pCont;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
pCfg->mnodeNum = htonl(pCfg->mnodeNum);
|
||||
for (int32_t i = 0; i < pCfg->mnodeNum; ++i) {
|
||||
pCfg->mnodeEps[i].dnodeId = htonl(pCfg->mnodeEps[i].dnodeId);
|
||||
pCfg->mnodeEps[i].dnodePort = htons(pCfg->mnodeEps[i].dnodePort);
|
||||
}
|
||||
|
||||
if (pCfg->dnodeId != dnodeGetDnodeId()) {
|
||||
dDebug("dnode:%d, in create meps msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
|
||||
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
|
||||
}
|
||||
|
||||
if (mnodeGetStatus() == MN_STATUS_READY) return 0;
|
||||
|
||||
return mnodeDeploy();
|
||||
}
|
||||
|
||||
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg) {
|
||||
int32_t code = dnodeStartMnode(pMsg);
|
||||
|
||||
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
|
||||
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
|
||||
SCfgDnodeMsg *pCfg = pMsg->pCont;
|
||||
|
||||
int32_t code = taosCfgDynamicOptions(pCfg->config);
|
||||
|
||||
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code};
|
||||
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
void dnodeProcessStartupReq(SRpcMsg *pMsg) {
|
||||
dInfo("startup msg is received, cont:%s", (char *)pMsg->pCont);
|
||||
|
||||
SStartupStep *pStep = rpcMallocCont(sizeof(SStartupStep));
|
||||
dnodeGetStartup(pStep);
|
||||
|
||||
dDebug("startup msg is sent, step:%s desc:%s finished:%d", pStep->name, pStep->desc, pStep->finished);
|
||||
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = pStep, .contLen = sizeof(SStartupStep)};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnodeCheck.h"
|
||||
#include "dnodeConfig.h"
|
||||
#include "dnodeDnode.h"
|
||||
#include "dnodeTransport.h"
|
||||
#include "mnode.h"
|
||||
#include "sync.h"
|
||||
#include "tcache.h"
|
||||
#include "tconfig.h"
|
||||
#include "tnote.h"
|
||||
#include "tstep.h"
|
||||
#include "vnode.h"
|
||||
#include "wal.h"
|
||||
|
||||
static struct {
|
||||
EDnStat runStatus;
|
||||
SStartupStep startup;
|
||||
SSteps *steps;
|
||||
} tsDnode;
|
||||
|
||||
EDnStat dnodeGetRunStat() { return tsDnode.runStatus; }
|
||||
|
||||
void dnodeSetRunStat(EDnStat stat) { tsDnode.runStatus = stat; }
|
||||
|
||||
void dnodeReportStartup(char *name, char *desc) {
|
||||
SStartupStep *startup = &tsDnode.startup;
|
||||
tstrncpy(startup->name, name, strlen(startup->name));
|
||||
tstrncpy(startup->desc, desc, strlen(startup->desc));
|
||||
startup->finished = 0;
|
||||
}
|
||||
|
||||
static void dnodeReportStartupFinished(char *name, char *desc) {
|
||||
SStartupStep *startup = &tsDnode.startup;
|
||||
tstrncpy(startup->name, name, strlen(startup->name));
|
||||
tstrncpy(startup->desc, desc, strlen(startup->desc));
|
||||
startup->finished = 1;
|
||||
}
|
||||
|
||||
void dnodeGetStartup(SStartupStep *pStep) { memcpy(pStep, &tsDnode.startup, sizeof(SStartupStep)); }
|
||||
|
||||
static int32_t dnodeInitVnode() {
|
||||
return vnodeInit();
|
||||
}
|
||||
|
||||
static int32_t dnodeInitMnode() {
|
||||
SMnodePara para;
|
||||
para.fp.GetDnodeEp = dnodeGetEp;
|
||||
para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
|
||||
para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
|
||||
para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
|
||||
para.dnodeId = dnodeGetDnodeId();
|
||||
para.clusterId = dnodeGetClusterId();
|
||||
|
||||
return mnodeInit(para);
|
||||
}
|
||||
|
||||
static int32_t dnodeInitTfs() {}
|
||||
|
||||
static int32_t dnodeInitMain() {
|
||||
tsDnode.runStatus = DN_RUN_STAT_STOPPED;
|
||||
tscEmbedded = 1;
|
||||
taosIgnSIGPIPE();
|
||||
taosBlockSIGPIPE();
|
||||
taosResolveCRC();
|
||||
taosInitGlobalCfg();
|
||||
taosReadGlobalLogCfg();
|
||||
taosSetCoreDump(tsEnableCoreFile);
|
||||
|
||||
if (!taosMkDir(tsLogDir)) {
|
||||
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
char temp[TSDB_FILENAME_LEN];
|
||||
sprintf(temp, "%s/taosdlog", tsLogDir);
|
||||
if (taosInitLog(temp, tsNumOfLogLines, 1) < 0) {
|
||||
printf("failed to init log file\n");
|
||||
}
|
||||
|
||||
if (!taosReadGlobalCfg()) {
|
||||
taosPrintGlobalCfg();
|
||||
dError("TDengine read global config failed");
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("start to initialize TDengine");
|
||||
|
||||
taosInitNotes();
|
||||
|
||||
return taosCheckGlobalCfg();
|
||||
}
|
||||
|
||||
static void dnodeCleanupMain() {
|
||||
taos_cleanup();
|
||||
taosCloseLog();
|
||||
taosStopCacheRefreshWorker();
|
||||
}
|
||||
|
||||
static int32_t dnodeCheckRunning(char *dir) {
|
||||
char filepath[256] = {0};
|
||||
snprintf(filepath, sizeof(filepath), "%s/.running", dir);
|
||||
|
||||
FileFd fd = taosOpenFileCreateWriteTrunc(filepath);
|
||||
if (fd < 0) {
|
||||
dError("failed to open lock file:%s since %s, quit", filepath, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t ret = taosLockFile(fd);
|
||||
if (ret != 0) {
|
||||
dError("failed to lock file:%s since %s, quit", filepath, strerror(errno));
|
||||
taosCloseFile(fd);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dnodeInitDir() {
|
||||
sprintf(tsMnodeDir, "%s/mnode", tsDataDir);
|
||||
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
|
||||
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
|
||||
|
||||
if (!taosMkDir(tsDnodeDir)) {
|
||||
dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!taosMkDir(tsMnodeDir)) {
|
||||
dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!taosMkDir(tsVnodeDir)) {
|
||||
dError("failed to create dir:%s since %s", tsVnodeDir, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dnodeCheckRunning(tsDnodeDir) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dnodeCleanupDir() {}
|
||||
|
||||
int32_t dnodeInit() {
|
||||
SSteps *steps = taosStepInit(24, dnodeReportStartup);
|
||||
if (steps == NULL) return -1;
|
||||
|
||||
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
|
||||
taosStepAdd(steps, "dnode-dir", dnodeInitDir, dnodeCleanupDir);
|
||||
taosStepAdd(steps, "dnode-check", dnodeInitCheck, dnodeCleanupCheck);
|
||||
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
|
||||
taosStepAdd(steps, "dnode-tfs", dnodeInitTfs, NULL);
|
||||
taosStepAdd(steps, "dnode-wal", walInit, walCleanUp);
|
||||
taosStepAdd(steps, "dnode-sync", syncInit, syncCleanUp);
|
||||
taosStepAdd(steps, "dnode-eps", dnodeInitEps, dnodeCleanupEps);
|
||||
taosStepAdd(steps, "dnode-vnode", dnodeInitVnode, vnodeCleanup);
|
||||
taosStepAdd(steps, "dnode-mnode", dnodeInitMnode, mnodeCleanup);
|
||||
taosStepAdd(steps, "dnode-trans", dnodeInitTrans, dnodeCleanupTrans);
|
||||
taosStepAdd(steps, "dnode-msg", dnodeInitMsg, dnodeCleanupMsg);
|
||||
|
||||
tsDnode.steps = steps;
|
||||
taosStepExec(tsDnode.steps);
|
||||
|
||||
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
|
||||
dnodeReportStartupFinished("TDengine", "initialized successfully");
|
||||
dInfo("TDengine is initialized successfully");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanup() {
|
||||
if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
|
||||
dnodeSetRunStat(DN_RUN_STAT_STOPPED);
|
||||
taosStepCleanup(tsDnode.steps);
|
||||
tsDnode.steps = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg;
|
||||
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg;
|
||||
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg;
|
||||
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg;
|
||||
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg;
|
||||
// tsVnode.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg;
|
||||
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
#include "os.h"
|
||||
#include "ulog.h"
|
||||
#include "dnode.h"
|
||||
#include "dnodeInt.h"
|
||||
|
||||
static bool stop = false;
|
||||
static void sigintHandler(int32_t signum, void *info, void *ctx) { stop = true; }
|
|
@ -0,0 +1,382 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
/* this file is mainly responsible for the communication between DNODEs. Each
|
||||
* dnode works as both server and client. Dnode may send status, grant, config
|
||||
* messages to mnode, mnode may send create/alter/drop table/vnode messages
|
||||
* to dnode. All theses messages are handled from here
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dnodeTransport.h"
|
||||
#include "dnodeConfig.h"
|
||||
#include "dnodeDnode.h"
|
||||
#include "dnodeMnode.h"
|
||||
#include "dnodeVnodes.h"
|
||||
#include "mnode.h"
|
||||
#include "vnode.h"
|
||||
|
||||
typedef void (*MsgFp)(SRpcMsg *pMsg);
|
||||
|
||||
static struct {
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
void *shellRpc;
|
||||
MsgFp msgFp[TSDB_MSG_TYPE_MAX];
|
||||
} tsTrans;
|
||||
|
||||
static void dnodeInitMsgFp() {
|
||||
// msg from client to dnode
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_TABLE_META] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_TABLES_META] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
|
||||
|
||||
// msg from client to mnode
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_TOPIC] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_TOPIC] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_TOPIC] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_FUNCTION] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE_FUNC] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessMsg;
|
||||
|
||||
// message from mnode to dnode
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_STABLE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_STABLE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_STABLE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_VNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_ALTER_VNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_VNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_SYNC_VNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN] = vnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN] = dnodeProcessCreateMnodeReq;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN] = NULL;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DROP_MNODE_IN_RSP] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN] = NULL;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE_IN_RSP] = mnodeProcessMsg;
|
||||
|
||||
// message from dnode to mnode
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = NULL;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = NULL;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
|
||||
tsTrans.msgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
|
||||
}
|
||||
|
||||
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
|
||||
int32_t msgType = pMsg->msgType;
|
||||
|
||||
if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
|
||||
dnodeProcessStartupReq(pMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
|
||||
rspMsg.code = TSDB_CODE_APP_NOT_READY;
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pMsg->pCont == NULL) {
|
||||
rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
|
||||
rpcSendResponse(&rspMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
MsgFp fp = tsTrans.msgFp[msgType];
|
||||
if (fp != NULL) {
|
||||
dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
|
||||
(*fp)(pMsg);
|
||||
} else {
|
||||
dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
|
||||
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t dnodeInitServer() {
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = tsDnodeDnodePort;
|
||||
rpcInit.label = "DND-S";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = dnodeProcessPeerReq;
|
||||
rpcInit.sessions = TSDB_MAX_VNODES << 4;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
|
||||
tsTrans.serverRpc = rpcOpen(&rpcInit);
|
||||
if (tsTrans.serverRpc == NULL) {
|
||||
dError("failed to init peer rpc server");
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("dnode peer rpc server is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dnodeCleanupServer() {
|
||||
if (tsTrans.serverRpc) {
|
||||
rpcClose(tsTrans.serverRpc);
|
||||
tsTrans.serverRpc = NULL;
|
||||
dInfo("dnode peer server is closed");
|
||||
}
|
||||
}
|
||||
|
||||
static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||
int32_t msgType = pMsg->msgType;
|
||||
|
||||
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
|
||||
if (pMsg == NULL || pMsg->pCont == NULL) return;
|
||||
dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
|
||||
dnodeUpdateMnodeEps(pEpSet);
|
||||
}
|
||||
|
||||
MsgFp fp = tsTrans.msgFp[msgType];
|
||||
if (fp != NULL) {
|
||||
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
|
||||
(*fp)(pMsg);
|
||||
} else {
|
||||
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
|
||||
}
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
static int32_t dnodeInitClient() {
|
||||
char secret[TSDB_KEY_LEN] = "secret";
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.label = "DND-C";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = dnodeProcessPeerRsp;
|
||||
rpcInit.sessions = TSDB_MAX_VNODES << 4;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.user = "t";
|
||||
rpcInit.ckey = "key";
|
||||
rpcInit.secret = secret;
|
||||
|
||||
tsTrans.clientRpc = rpcOpen(&rpcInit);
|
||||
if (tsTrans.clientRpc == NULL) {
|
||||
dError("failed to init peer rpc client");
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("dnode peer rpc client is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dnodeCleanupClient() {
|
||||
if (tsTrans.clientRpc) {
|
||||
rpcClose(tsTrans.clientRpc);
|
||||
tsTrans.clientRpc = NULL;
|
||||
dInfo("dnode peer rpc client is closed");
|
||||
}
|
||||
}
|
||||
|
||||
static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
||||
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
|
||||
int32_t msgType = pMsg->msgType;
|
||||
|
||||
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
|
||||
dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
|
||||
rspMsg.code = TSDB_CODE_DND_EXITING;
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
} else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
|
||||
dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
|
||||
rspMsg.code = TSDB_CODE_APP_NOT_READY;
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pMsg->pCont == NULL) {
|
||||
rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
|
||||
rpcSendResponse(&rspMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
MsgFp fp = tsTrans.msgFp[msgType];
|
||||
if (fp != NULL) {
|
||||
dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
|
||||
(*fp)(pMsg);
|
||||
} else {
|
||||
dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
|
||||
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
}
|
||||
|
||||
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
|
||||
|
||||
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
|
||||
SRpcEpSet epSet = {0};
|
||||
dnodeGetEpSetForPeer(&epSet);
|
||||
dnodeSendMsgToDnode(&epSet, rpcMsg);
|
||||
}
|
||||
|
||||
static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
|
||||
SRpcEpSet epSet = {0};
|
||||
dnodeGetEpSetForPeer(&epSet);
|
||||
rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
|
||||
}
|
||||
|
||||
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
|
||||
int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
|
||||
if (code != TSDB_CODE_APP_NOT_READY) return code;
|
||||
|
||||
SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg));
|
||||
tstrncpy(pMsg->user, user, sizeof(pMsg->user));
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.pCont = pMsg;
|
||||
rpcMsg.contLen = sizeof(SAuthMsg);
|
||||
rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH;
|
||||
|
||||
dDebug("user:%s, send auth msg to mnodes", user);
|
||||
SRpcMsg rpcRsp = {0};
|
||||
dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp);
|
||||
|
||||
if (rpcRsp.code != 0) {
|
||||
dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code));
|
||||
} else {
|
||||
SAuthRsp *pRsp = rpcRsp.pCont;
|
||||
dDebug("user:%s, auth msg received from mnodes", user);
|
||||
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
|
||||
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
|
||||
*spi = pRsp->spi;
|
||||
*encrypt = pRsp->encrypt;
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcRsp.pCont);
|
||||
return rpcRsp.code;
|
||||
}
|
||||
|
||||
static int32_t dnodeInitShell() {
|
||||
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
|
||||
if (numOfThreads < 1) {
|
||||
numOfThreads = 1;
|
||||
}
|
||||
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = tsDnodeShellPort;
|
||||
rpcInit.label = "SHELL";
|
||||
rpcInit.numOfThreads = numOfThreads;
|
||||
rpcInit.cfp = dnodeProcessShellReq;
|
||||
rpcInit.sessions = tsMaxShellConns;
|
||||
rpcInit.connType = TAOS_CONN_SERVER;
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.afp = dnodeRetrieveUserAuthInfo;
|
||||
|
||||
tsTrans.shellRpc = rpcOpen(&rpcInit);
|
||||
if (tsTrans.shellRpc == NULL) {
|
||||
dError("failed to init shell rpc server");
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("dnode shell rpc server is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dnodeCleanupShell() {
|
||||
if (tsTrans.shellRpc) {
|
||||
rpcClose(tsTrans.shellRpc);
|
||||
tsTrans.shellRpc = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dnodeInitTrans() {
|
||||
if (dnodeInitClient() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dnodeInitServer() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dnodeInitShell() != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dnodeCleanupTrans() {
|
||||
dnodeCleanupShell();
|
||||
dnodeCleanupServer();
|
||||
dnodeCleanupClient();
|
||||
}
|
|
@ -28,10 +28,10 @@ static struct {
|
|||
SWorkerPool write;
|
||||
SWorkerPool peerReq;
|
||||
SWorkerPool peerRsp;
|
||||
taos_queue readQ;
|
||||
taos_queue writeQ;
|
||||
taos_queue peerReqQ;
|
||||
taos_queue peerRspQ;
|
||||
taos_queue readQ;
|
||||
taos_queue writeQ;
|
||||
taos_queue peerReqQ;
|
||||
taos_queue peerRspQ;
|
||||
int32_t (*writeMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
|
||||
int32_t (*readMsgFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
|
||||
int32_t (*peerReqFp[TSDB_MSG_TYPE_MAX])(SMnMsg *);
|
||||
|
@ -81,7 +81,7 @@ static void mnodeDispatchToWriteQueue(SRpcMsg *pRpcMsg) {
|
|||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into wqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg);
|
||||
taosWriteQitem(tsMworker.writeQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,7 @@ void mnodeReDispatchToWriteQueue(SMnMsg *pMsg) {
|
|||
mnodeSendRedirectMsg(&pMsg->rpcMsg, true);
|
||||
mnodeCleanupMsg(pMsg);
|
||||
} else {
|
||||
taosWriteQitem(tsMworker.writeQ, TAOS_QTYPE_RPC, pMsg);
|
||||
taosWriteQitem(tsMworker.writeQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,7 +107,7 @@ static void mnodeDispatchToReadQueue(SRpcMsg *pRpcMsg) {
|
|||
rpcSendResponse(&rpcRsp);
|
||||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into rqueue", pMsg, pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.readQ, TAOS_QTYPE_RPC, pMsg);
|
||||
taosWriteQitem(tsMworker.readQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -125,7 +125,7 @@ static void mnodeDispatchToPeerQueue(SRpcMsg *pRpcMsg) {
|
|||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into peer req queue", pMsg, pMsg->rpcMsg.ahandle,
|
||||
taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.peerReqQ, TAOS_QTYPE_RPC, pMsg);
|
||||
taosWriteQitem(tsMworker.peerReqQ, pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,13 +140,13 @@ void mnodeDispatchToPeerRspQueue(SRpcMsg *pRpcMsg) {
|
|||
} else {
|
||||
mTrace("msg:%p, app:%p type:%s is put into peer rsp queue", pMsg, pMsg->rpcMsg.ahandle,
|
||||
taosMsg[pMsg->rpcMsg.msgType]);
|
||||
taosWriteQitem(tsMworker.peerRspQ, TAOS_QTYPE_RPC, pMsg);
|
||||
taosWriteQitem(tsMworker.peerRspQ, pMsg);
|
||||
}
|
||||
|
||||
// rpcFreeCont(pRpcMsg->pCont);
|
||||
}
|
||||
|
||||
static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) {
|
||||
void mnodeSendRsp(SMnMsg *pMsg, int32_t code) {
|
||||
if (pMsg == NULL) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
|
||||
if (code == TSDB_CODE_MND_ACTION_NEED_REPROCESSED) {
|
||||
|
@ -155,22 +155,16 @@ static void mnodeSendRpcRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t
|
|||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcMsg.handle,
|
||||
.pCont = pMsg->rpcRsp.rsp,
|
||||
.contLen = pMsg->rpcRsp.len,
|
||||
.code = code,
|
||||
.handle = pMsg->rpcMsg.handle,
|
||||
.pCont = pMsg->rpcRsp.rsp,
|
||||
.contLen = pMsg->rpcRsp.len,
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
mnodeCleanupMsg(pMsg);
|
||||
}
|
||||
|
||||
void mnodeSendRsp(SMnMsg *pMsg, int32_t code) { mnodeSendRpcRsp(NULL, pMsg, 0, code); }
|
||||
|
||||
static void mnodeProcessPeerRspEnd(void *ahandle, SMnMsg *pMsg, int32_t qtype, int32_t code) {
|
||||
mnodeCleanupMsg(pMsg);
|
||||
}
|
||||
|
||||
static void mnodeInitMsgFp() {
|
||||
// // peer req
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeDispatchToPeerQueue;
|
||||
|
@ -207,96 +201,98 @@ static void mnodeInitMsgFp() {
|
|||
// tsMworker.peerRspFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessDropVnodeRsp;
|
||||
|
||||
// // read msg
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessHeartBeatMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessConnectMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessUseMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_HEARTBEAT] = mnodeProcessHeartBeatMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CONNECT] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CONNECT] = mnodeProcessConnectMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_USE_DB] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_USE_DB] = mnodeProcessUseMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessTableMetaMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMultiTableMetaMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessSuperTableVgroupMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_TABLE_META] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_TABLE_META] = mnodeProcessTableMetaMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_TABLES_META] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_TABLES_META] = mnodeProcessMultiTableMetaMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = mnodeProcessSuperTableVgroupMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessShowMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessRetrieveMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessRetrieveFuncReq;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_SHOW] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_SHOW] = mnodeProcessShowMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_SHOW_RETRIEVE] = mnodeProcessRetrieveMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_RETRIEVE_FUNC] = mnodeDispatchToReadQueue;
|
||||
// tsMworker.readMsgFp[TSDB_MSG_TYPE_RETRIEVE_FUNC] = mnodeProcessRetrieveFuncReq;
|
||||
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = acctProcessCreateAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = acctProcessDropAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = acctProcessAlterAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CREATE_ACCT] = acctProcessCreateAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_ALTER_ACCT] = acctProcessDropAcctMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_DROP_ACCT] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_DROP_ACCT] = acctProcessAlterAcctMsg;
|
||||
|
||||
// // write msg
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessCreateUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessAlterUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessDropUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_USER] = mnodeProcessCreateUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_USER] = mnodeProcessAlterUserMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_USER] = mnodeProcessDropUserMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessCreateDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessDropDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessCfgDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_DNODE] = mnodeProcessCreateDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_DNODE] = mnodeProcessDropDnodeMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CONFIG_DNODE] = mnodeProcessCfgDnodeMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessCreateDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessAlterDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessDropDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessSyncDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessCompactMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_DB] = mnodeProcessCreateDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_DB] = mnodeProcessAlterDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_DB] = mnodeProcessDropDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_SYNC_DB] = mnodeProcessSyncDbMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_COMPACT_VNODE] = mnodeProcessCompactMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessCreateFuncMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessDropFuncMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_FUNCTION] = mnodeProcessCreateFuncMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_FUNCTION] = mnodeProcessDropFuncMsg;
|
||||
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = tpProcessCreateTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = tpProcessAlterTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = tpProcessDropTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_CREATE_TP] = tpProcessCreateTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_DROP_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_DROP_TP] = tpProcessAlterTpMsg;
|
||||
// // tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_TP] = mnodeDispatchToWriteQueue;
|
||||
// // tsMworker.readMsgFp[TSDB_MSG_TYPE_ALTER_TP] = tpProcessDropTpMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessCreateTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessDropTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessAlterTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CREATE_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CREATE_TABLE] = mnodeProcessCreateTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_DROP_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_DROP_TABLE] = mnodeProcessDropTableMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_TABLE] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_TABLE] = mnodeProcessAlterTableMsg;
|
||||
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = NULL;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessKillQueryMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessKillStreamMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessKillConnectionMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_ALTER_STREAM] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_ALTER_STREAM] = NULL;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_QUERY] = mnodeProcessKillQueryMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_STREAM] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_STREAM] = mnodeProcessKillStreamMsg;
|
||||
// tsMworker.msgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeDispatchToWriteQueue;
|
||||
// tsMworker.writeMsgFp[TSDB_MSG_TYPE_KILL_CONN] = mnodeProcessKillConnectionMsg;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
|
||||
static void mnodeProcessWriteReq(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, app:%p type:%s content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
return TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
|
@ -309,31 +305,39 @@ static int32_t mnodeProcessWriteReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
|
|||
mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
|
||||
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
|
||||
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.writeMsgFp[msgType] == NULL) {
|
||||
mError("msg:%p, app:%p type:%s not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_WRITE_REQ_END;
|
||||
}
|
||||
|
||||
return (*tsMworker.writeMsgFp[msgType])(pMsg);
|
||||
code = (*tsMworker.writeMsgFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_WRITE_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) {
|
||||
static void mnodeProcessReadReq(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, app:%p type:%s in mread queue, content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
return TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
SMnRsp *rpcRsp = &pMsg->rpcRsp;
|
||||
SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet));
|
||||
if (!epSet) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
mnodeGetMnodeEpSetForShell(epSet, true);
|
||||
rpcRsp->rsp = epSet;
|
||||
|
@ -341,25 +345,32 @@ static int32_t mnodeProcessReadReq(void* unused, SMnMsg *pMsg, int32_t qtype) {
|
|||
|
||||
mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle, taosMsg[msgType],
|
||||
epSet->numOfEps, epSet->inUse);
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.readMsgFp[msgType] == NULL) {
|
||||
mError("msg:%p, app:%p type:%s in mread queue, not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_READ_REQ_END;
|
||||
}
|
||||
|
||||
mTrace("msg:%p, app:%p type:%s will be processed in mread queue", pMsg, ahandle, taosMsg[msgType]);
|
||||
return (*tsMworker.readMsgFp[msgType])(pMsg);
|
||||
code = (*tsMworker.readMsgFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_READ_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
|
||||
static void mnodeProcessPeerReq(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
void * ahandle = pMsg->rpcMsg.ahandle;
|
||||
void *ahandle = pMsg->rpcMsg.ahandle;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMsg->rpcMsg.pCont == NULL) {
|
||||
mError("msg:%p, ahandle:%p type:%s in mpeer queue, content is null", pMsg, ahandle, taosMsg[msgType]);
|
||||
return TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
code = TSDB_CODE_MND_INVALID_MSG_LEN;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
|
@ -372,24 +383,29 @@ static int32_t mnodeProcessPeerReq(void *unused, SMnMsg *pMsg, int32_t qtype) {
|
|||
mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, ahandle,
|
||||
taosMsg[msgType], epSet->numOfEps, epSet->inUse);
|
||||
|
||||
return TSDB_CODE_RPC_REDIRECT;
|
||||
code = TSDB_CODE_RPC_REDIRECT;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
if (tsMworker.peerReqFp[msgType] == NULL) {
|
||||
mError("msg:%p, ahandle:%p type:%s in mpeer queue, not processed", pMsg, ahandle, taosMsg[msgType]);
|
||||
return TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
code = TSDB_CODE_MND_MSG_NOT_PROCESSED;
|
||||
goto PROCESS_PEER_REQ_END;
|
||||
}
|
||||
|
||||
return (*tsMworker.peerReqFp[msgType])(pMsg);
|
||||
code = (*tsMworker.peerReqFp[msgType])(pMsg);
|
||||
|
||||
PROCESS_PEER_REQ_END:
|
||||
mnodeSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) {
|
||||
static void mnodeProcessPeerRsp(SMnMsg *pMsg, void *unused) {
|
||||
int32_t msgType = pMsg->rpcMsg.msgType;
|
||||
SRpcMsg *pRpcMsg = &pMsg->rpcMsg;
|
||||
|
||||
if (!mnodeIsMaster()) {
|
||||
mError("msg:%p, ahandle:%p type:%s not processed for not master", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
|
||||
return 0;
|
||||
mnodeCleanupMsg(pMsg);
|
||||
}
|
||||
|
||||
if (tsMworker.peerRspFp[msgType]) {
|
||||
|
@ -398,7 +414,7 @@ static int32_t mnodeProcessPeerRsp(void *ahandle, SMnMsg *pMsg, int32_t qtype) {
|
|||
mError("msg:%p, ahandle:%p type:%s is not processed", pRpcMsg, pRpcMsg->ahandle, taosMsg[msgType]);
|
||||
}
|
||||
|
||||
return 0;
|
||||
mnodeCleanupMsg(pMsg);
|
||||
}
|
||||
|
||||
int32_t mnodeInitWorker() {
|
||||
|
@ -406,20 +422,16 @@ int32_t mnodeInitWorker() {
|
|||
|
||||
SWorkerPool *pPool = &tsMworker.write;
|
||||
pPool->name = "mnode-write";
|
||||
pPool->startFp = (ProcessStartFp)mnodeProcessWriteReq;
|
||||
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
|
||||
pPool->min = 1;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL);
|
||||
tsMworker.writeQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessWriteReq);
|
||||
}
|
||||
|
||||
pPool = &tsMworker.read;
|
||||
pPool->name = "mnode-read";
|
||||
pPool->startFp = (ProcessStartFp)mnodeProcessReadReq;
|
||||
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
|
||||
pPool->min = 2;
|
||||
pPool->max = (int32_t)(tsNumOfCores * tsNumOfThreadsPerCore / 2);
|
||||
pPool->max = MAX(2, pPool->max);
|
||||
|
@ -427,31 +439,27 @@ int32_t mnodeInitWorker() {
|
|||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.readQ = tWorkerAllocQueue(pPool, NULL);
|
||||
tsMworker.readQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessReadReq);
|
||||
}
|
||||
|
||||
pPool = &tsMworker.peerReq;
|
||||
pPool->name = "mnode-peer-req";
|
||||
pPool->startFp = (ProcessStartFp)mnodeProcessPeerReq;
|
||||
pPool->endFp = (ProcessEndFp)mnodeSendRpcRsp;
|
||||
pPool->min = 1;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL);
|
||||
tsMworker.peerReqQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessPeerReq);
|
||||
}
|
||||
|
||||
pPool = &tsMworker.peerRsp;
|
||||
pPool->name = "mnode-peer-rsp";
|
||||
pPool->startFp = (ProcessStartFp)mnodeProcessPeerRsp;
|
||||
pPool->endFp = (ProcessEndFp)mnodeProcessPeerRspEnd;
|
||||
pPool->min = 1;
|
||||
pPool->max = 1;
|
||||
if (tWorkerInit(pPool) != 0) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL);
|
||||
tsMworker.peerRspQ = tWorkerAllocQueue(pPool, NULL, (FProcessItem)mnodeProcessPeerRsp);
|
||||
}
|
||||
|
||||
mInfo("mnode worker is initialized");
|
|
@ -16,6 +16,8 @@
|
|||
#ifndef _TD_VNODE_COMMIT_H_
|
||||
#define _TD_VNODE_COMMIT_H_
|
||||
|
||||
#include "vnodeInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_INT_H_
|
||||
#define _TD_VNODE_INT_H_
|
||||
|
||||
#include "vnode.h"
|
||||
|
||||
#include "amalloc.h"
|
||||
#include "meta.h"
|
||||
#include "sync.h"
|
||||
#include "tlog.h"
|
||||
#include "tq.h"
|
||||
#include "tsdb.h"
|
||||
#include "wal.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
extern int32_t vDebugFlag;
|
||||
|
||||
#define vFatal(...) { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", 255, __VA_ARGS__); }}
|
||||
#define vError(...) { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", 255, __VA_ARGS__); }}
|
||||
#define vWarn(...) { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", 255, __VA_ARGS__); }}
|
||||
#define vInfo(...) { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", 255, __VA_ARGS__); }}
|
||||
#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
|
||||
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
typedef struct SVnode {
|
||||
int32_t vgId;
|
||||
SVnodeCfg cfg;
|
||||
SMeta *pMeta;
|
||||
STsdb *pTsdb;
|
||||
STQ *pTQ;
|
||||
SWal *pWal;
|
||||
SSyncNode *pSync;
|
||||
} SVnode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_INT_H_*/
|
|
@ -16,7 +16,7 @@
|
|||
#ifndef _TD_VNODE_MEM_ALLOCATOR_H_
|
||||
#define _TD_VNODE_MEM_ALLOCATOR_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
|
@ -13,28 +13,18 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_MGMT_H_
|
||||
#define _TD_VNODE_MGMT_H_
|
||||
#ifndef _TD_VNODE_READ_H_
|
||||
#define _TD_VNODE_READ_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "vnodeInt.h"
|
||||
|
||||
|
||||
typedef struct {
|
||||
SVnode *pVnode;
|
||||
SRpcMsg rpcMsg;
|
||||
char pCont[];
|
||||
} SVnMgmtMsg;
|
||||
|
||||
|
||||
int32_t vnodeInitMgmt();
|
||||
void vnodeCleanupMgmt();
|
||||
void vnodeProcessMgmtMsg(SRpcMsg *pMsg);
|
||||
void vnodeProcessReadMsg(SVnode *pVnode, SVnodeMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_MGMT_H_*/
|
||||
#endif /*_TD_VNODE_READ_H_*/
|
|
@ -13,21 +13,18 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_CFG_H_
|
||||
#define _TD_VNODE_CFG_H_
|
||||
#ifndef _TD_VNODE_WRITE_H_
|
||||
#define _TD_VNODE_WRITE_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "vnodeInt.h"
|
||||
|
||||
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
|
||||
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
|
||||
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState);
|
||||
int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState);
|
||||
void vnodeProcessWriteMsg(SVnode* pVnode, SVnodeMsg* pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_CFG_H_*/
|
||||
#endif /*_TD_VNODE_WRITE_H_*/
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnodeInt.h"
|
||||
|
||||
int32_t vnodeInit() { return 0; }
|
||||
void vnodeCleanup() {}
|
||||
|
||||
int32_t vnodeGetStatistics(SVnode *pVnode, SVnodeStatisic *pStat) { return 0; }
|
||||
int32_t vnodeGetStatus(SVnode *pVnode, SVnodeStatus *pStatus) { return 0; }
|
||||
|
||||
SVnode *vnodeOpen(int32_t vgId, const char *path) { return NULL; }
|
||||
void vnodeClose(SVnode *pVnode) {}
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
|
||||
SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; }
|
||||
int32_t vnodeDrop(SVnode *pVnode) { return 0; }
|
||||
int32_t vnodeCompact(SVnode *pVnode) { return 0; }
|
||||
int32_t vnodeSync(SVnode *pVnode) { return 0; }
|
||||
|
||||
int32_t vnodeProcessMsg(SVnode *pVnode, SVnodeMsg *pMsg) { return 0; }
|
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnodeRead.h"
|
|
@ -0,0 +1,17 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnodeWrite.h"
|
|
@ -6,4 +6,4 @@ target_sources(VMATest
|
|||
"vnodeMemAllocatorTest.cpp"
|
||||
)
|
||||
target_include_directories(VMATest PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc")
|
||||
target_link_libraries(VMATest os gtest_main)
|
||||
target_link_libraries(VMATest os gtest_main vnode)
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue