[TD-10430] add dnode trans file

This commit is contained in:
Shengliang Guan 2021-10-25 10:38:15 +08:00
parent 7bcfb460ce
commit 3b1add19f1
11 changed files with 191 additions and 174 deletions

View File

@ -46,6 +46,16 @@ typedef struct {
*/ */
void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell); void (*SendRedirectMsg)(struct SRpcMsg *rpcMsg, bool forShell);
/**
* Get the corresponding endpoint information from dnodeId.
*
* @param dnode, the instance of dDnode module.
* @param dnodeId, the id ot dnode.
* @param ep, the endpoint of dnode.
* @param fqdn, the fqdn of dnode.
* @param port, the port of dnode.
*/
void (*GetDnodeEp)(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
} SMnodeFp; } SMnodeFp;
typedef struct { typedef struct {

View File

@ -68,6 +68,7 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_DATA_NULL_STR "NULL" #define TSDB_DATA_NULL_STR "NULL"
#define TSDB_DATA_NULL_STR_L "null" #define TSDB_DATA_NULL_STR_L "null"
#define TSDB_NETTEST_USER "nettestinternal"
#define TSDB_DEFAULT_USER "root" #define TSDB_DEFAULT_USER "root"
#ifdef _TD_POWER_ #ifdef _TD_POWER_
#define TSDB_DEFAULT_PASS "powerdb" #define TSDB_DEFAULT_PASS "powerdb"

View File

@ -19,7 +19,9 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "trpc.h" #include "trpc.h"
#include "dnode.h" #include "dnode.h"

View File

@ -22,10 +22,10 @@ extern "C" {
#include "dnodeInt.h" #include "dnodeInt.h"
typedef enum { typedef enum {
TD_RUN_STAT_INIT, DN_RUN_STAT_INIT,
TD_RUN_STAT_RUNNING, DN_RUN_STAT_RUNNING,
TD_RUN_STAT_STOPPED DN_RUN_STAT_STOPPED
} RunStat; } EDnRunStat;
int32_t dnodeInitMain(); int32_t dnodeInitMain();
void dnodeCleanupMain(); void dnodeCleanupMain();
@ -36,7 +36,7 @@ void dnodeReportStartupFinished(char *name, char *desc);
void dnodeProcessStartupReq(SRpcMsg *pMsg); void dnodeProcessStartupReq(SRpcMsg *pMsg);
void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg); void dnodeProcessCreateMnodeReq(SRpcMsg *pMsg);
void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg); void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg);
RunStat dnodeGetRunStat(); EDnRunStat dnodeGetRunStat();
void dnodeSetRunStat(); void dnodeSetRunStat();
void* dnodeGetTimer(); void* dnodeGetTimer();

View File

@ -25,7 +25,6 @@ int32_t dnodeInitTrans();
void dnodeCleanupTrans(); void dnodeCleanupTrans();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg); void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -46,6 +46,7 @@ static int32_t dnodeInitVnodeModule(void **unused) {
static int32_t dnodeInitMnodeModule(void **unused) { static int32_t dnodeInitMnodeModule(void **unused) {
SMnodePara para; SMnodePara para;
para.fp.GetDnodeEp = dnodeGetDnodeEp;
para.fp.SendMsgToDnode = dnodeSendMsgToDnode; para.fp.SendMsgToDnode = dnodeSendMsgToDnode;
para.fp.SendMsgToMnode = dnodeSendMsgToMnode; para.fp.SendMsgToMnode = dnodeSendMsgToMnode;
para.fp.SendRedirectMsg = dnodeSendRedirectMsg; para.fp.SendRedirectMsg = dnodeSendRedirectMsg;
@ -77,7 +78,7 @@ int32_t dnodeInit() {
taosStepExec(tsSteps); taosStepExec(tsSteps);
dnodeSetRunStat(TD_RUN_STAT_RUNNING); dnodeSetRunStat(DN_RUN_STAT_RUNNING);
dnodeReportStartupFinished("TDengine", "initialized successfully"); dnodeReportStartupFinished("TDengine", "initialized successfully");
dInfo("TDengine is initialized successfully"); dInfo("TDengine is initialized successfully");
@ -85,8 +86,8 @@ int32_t dnodeInit() {
} }
void dnodeCleanup() { void dnodeCleanup() {
if (dnodeGetRunStat() != TD_RUN_STAT_STOPPED) { if (dnodeGetRunStat() != DN_RUN_STAT_STOPPED) {
dnodeSetRunStat(TD_RUN_STAT_STOPPED); dnodeSetRunStat(DN_RUN_STAT_STOPPED);
taosStepCleanup(tsSteps); taosStepCleanup(tsSteps);
tsSteps = NULL; tsSteps = NULL;
} }

View File

@ -29,7 +29,7 @@
#include "mnode.h" #include "mnode.h"
static struct { static struct {
RunStat runStatus; EDnRunStat runStatus;
void * dnodeTimer; void * dnodeTimer;
SStartupStep startup; SStartupStep startup;
} tsDmain; } tsDmain;
@ -55,7 +55,7 @@ static void dnodeCheckDataDirOpenned(char *dir) {
} }
int32_t dnodeInitMain() { int32_t dnodeInitMain() {
tsDmain.runStatus = TD_RUN_STAT_STOPPED; tsDmain.runStatus = DN_RUN_STAT_STOPPED;
tsDmain.dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR"); tsDmain.dnodeTimer = taosTmrInit(100, 200, 60000, "DND-TMR");
if (tsDmain.dnodeTimer == NULL) { if (tsDmain.dnodeTimer == NULL) {
dError("failed to init dnode timer"); dError("failed to init dnode timer");
@ -260,8 +260,8 @@ void dnodeProcessConfigDnodeReq(SRpcMsg *pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
RunStat dnodeGetRunStat() { return tsDmain.runStatus; } EDnRunStat dnodeGetRunStat() { return tsDmain.runStatus; }
void dnodeSetRunStat(RunStat stat) { tsDmain.runStatus = stat; } void dnodeSetRunStat(EDnRunStat stat) { tsDmain.runStatus = stat; }
void* dnodeGetTimer() { return tsDmain.dnodeTimer; } void* dnodeGetTimer() { return tsDmain.dnodeTimer; }

View File

@ -20,21 +20,19 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "dnodeTrans.h"
#include "tglobal.h"
#include "dnodeMain.h" #include "dnodeMain.h"
#include "dnodeMnodeEps.h" #include "dnodeMnodeEps.h"
#include "dnodeStatus.h" #include "dnodeStatus.h"
#include "dnodeTrans.h"
#include "vnode.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h"
typedef void (*RpcMsgFp)( SRpcMsg *pMsg); typedef void (*RpcMsgFp)(SRpcMsg *pMsg);
static struct { static struct {
void * serverRpc; void *serverRpc;
void * clientRpc; void *clientRpc;
void * shellRpc; void *shellRpc;
int32_t queryReqNum; int32_t queryReqNum;
int32_t submitReqNum; int32_t submitReqNum;
RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX];
@ -43,18 +41,18 @@ static struct {
static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
int32_t msgType = pMsg->msgType;
if (pMsg->pCont == NULL) return; if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
if (pMsg->msgType == TSDB_MSG_TYPE_NETWORK_TEST) {
dnodeProcessStartupReq(pMsg); dnodeProcessStartupReq(pMsg);
return; return;
} }
if (dnodeGetRunStat() != TD_RUN_STAT_RUNNING) { if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
rspMsg.code = TSDB_CODE_APP_NOT_READY; rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
dTrace("RPC %p, msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
return; return;
} }
@ -64,38 +62,40 @@ static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
return; return;
} }
RpcMsgFp fp = tsTrans.peerMsgFp[pMsg->msgType]; RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg); (*fp)(pMsg);
} else { } else {
dDebug("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]);
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
} }
int32_t dnodeInitServer() { static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
@ -117,7 +117,7 @@ int32_t dnodeInitServer() {
return 0; return 0;
} }
void dnodeCleanupServer() { static void dnodeCleanupServer() {
if (tsTrans.serverRpc) { if (tsTrans.serverRpc) {
rpcClose(tsTrans.serverRpc); rpcClose(tsTrans.serverRpc);
tsTrans.serverRpc = NULL; tsTrans.serverRpc = NULL;
@ -125,65 +125,66 @@ void dnodeCleanupServer() {
} }
} }
static void dnodeProcessRspFromPeer(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
if (dnodeGetRunStat() == TD_RUN_STAT_STOPPED) { int32_t msgType = pMsg->msgType;
if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
if (pMsg == NULL || pMsg->pCont == NULL) return; if (pMsg == NULL || pMsg->pCont == NULL) return;
dTrace("msg:%p is ignored since dnode is stopping", pMsg); dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
} }
if (pMsg->msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) {
dnodeUpdateMnodeFromPeer(pEpSet); dnodeUpdateMnodeFromPeer(pEpSet);
} }
RpcMsgFp fp = tsTrans.peerMsgFp[pMsg->msgType]; RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg); (*fp)(pMsg);
} else { } else {
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rspMsg);
} }
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
int32_t dnodeInitClient() { static int32_t dnodeInitClient() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
char secret[TSDB_KEY_LEN] = "secret"; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp;
char secret[TSDB_KEY_LEN] = "secret";
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND-C"; rpcInit.label = "DND-C";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dnodeProcessRspFromPeer; rpcInit.cfp = dnodeProcessPeerRsp;
rpcInit.sessions = TSDB_MAX_VNODES << 4; rpcInit.sessions = TSDB_MAX_VNODES << 4;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "t"; rpcInit.user = "t";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.secret = secret; rpcInit.secret = secret;
tsTrans.clientRpc = rpcOpen(&rpcInit); tsTrans.clientRpc = rpcOpen(&rpcInit);
if (tsTrans.clientRpc == NULL) { if (tsTrans.clientRpc == NULL) {
@ -195,7 +196,7 @@ int32_t dnodeInitClient() {
return 0; return 0;
} }
void dnodeCleanupClient() { static void dnodeCleanupClient() {
if (tsTrans.clientRpc) { if (tsTrans.clientRpc) {
rpcClose(tsTrans.clientRpc); rpcClose(tsTrans.clientRpc);
tsTrans.clientRpc = NULL; tsTrans.clientRpc = NULL;
@ -203,59 +204,50 @@ void dnodeCleanupClient() {
} }
} }
static void dnodeProcessMsgFromShell(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
SRpcMsg rpcMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0};
int32_t msgType = pMsg->msgType;
if (pMsg->pCont == NULL) return; if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) {
if (dnodeGetRunStat() == TD_RUN_STAT_STOPPED) { dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]);
dError("RPC %p, shell msg:%s is ignored since dnode exiting", pMsg->handle, taosMsg[pMsg->msgType]); rspMsg.code = TSDB_CODE_DND_EXITING;
rpcMsg.code = TSDB_CODE_DND_EXITING; rpcSendResponse(&rspMsg);
rpcSendResponse(&rpcMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
} else if (dnodeGetRunStat() != TD_RUN_STAT_RUNNING) { } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) {
dError("RPC %p, shell msg:%s is ignored since dnode not running", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]);
rpcMsg.code = TSDB_CODE_APP_NOT_READY; rspMsg.code = TSDB_CODE_APP_NOT_READY;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
return; return;
} }
if (pMsg->msgType == TSDB_MSG_TYPE_QUERY) { if (pMsg->pCont == NULL) {
atomic_fetch_add_32(&tsTrans.queryReqNum, 1); rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN;
} else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { rpcSendResponse(&rspMsg);
atomic_fetch_add_32(&tsTrans.submitReqNum, 1); return;
} else {} }
RpcMsgFp fp = tsTrans.shellMsgFp[pMsg->msgType]; if (msgType == TSDB_MSG_TYPE_QUERY) {
atomic_fetch_add_32(&tsTrans.queryReqNum, 1);
} else if (msgType == TSDB_MSG_TYPE_SUBMIT) {
atomic_fetch_add_32(&tsTrans.submitReqNum, 1);
} else {
}
RpcMsgFp fp = tsTrans.shellMsgFp[msgType];
if (fp != NULL) { if (fp != NULL) {
dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]);
(*fp)(pMsg); (*fp)(pMsg);
} else { } else {
dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[pMsg->msgType]); dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]);
rpcMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rspMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
} }
static int32_t dnodeAuthNetTest(char *user, char *spi, char *encrypt, char *secret, char *ckey) { void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
if (strcmp(user, "nettestinternal") == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pass, TSDB_KEY_LEN);
dTrace("nettest user is authorized");
return 0;
}
return -1;
}
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) {
rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL);
}
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
@ -263,19 +255,13 @@ void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
dnodeSendMsgToDnode(&epSet, rpcMsg); dnodeSendMsgToDnode(&epSet, rpcMsg);
} }
void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) {
SRpcEpSet epSet = {0}; SRpcEpSet epSet = {0};
dnodeGetEpSetForPeer(&epSet); dnodeGetEpSetForPeer(&epSet);
rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp);
} }
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp, SRpcEpSet *epSet) {
rpcSendRecv(tsTrans.clientRpc, epSet, rpcMsg, rpcRsp);
}
static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (dnodeAuthNetTest(user, spi, encrypt, secret, ckey) == 0) return 0;
int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey);
if (code != TSDB_CODE_APP_NOT_READY) return code; if (code != TSDB_CODE_APP_NOT_READY) return code;
@ -306,52 +292,52 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
return rpcRsp.code; return rpcRsp.code;
} }
int32_t dnodeInitShell() { static int32_t dnodeInitShell() {
tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg;
// the following message shall be treated as mnode query // the following message shall be treated as mnode query
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg;
tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq;
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
@ -360,14 +346,14 @@ int32_t dnodeInitShell() {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = tsDnodeShellPort; rpcInit.localPort = tsDnodeShellPort;
rpcInit.label = "SHELL"; rpcInit.label = "SHELL";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dnodeProcessMsgFromShell; rpcInit.cfp = dnodeProcessShellReq;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = dnodeRetrieveUserAuthInfo; rpcInit.afp = dnodeRetrieveUserAuthInfo;
tsTrans.shellRpc = rpcOpen(&rpcInit); tsTrans.shellRpc = rpcOpen(&rpcInit);
if (tsTrans.shellRpc == NULL) { if (tsTrans.shellRpc == NULL) {
@ -379,7 +365,7 @@ int32_t dnodeInitShell() {
return 0; return 0;
} }
void dnodeCleanupShell() { static void dnodeCleanupShell() {
if (tsTrans.shellRpc) { if (tsTrans.shellRpc) {
rpcClose(tsTrans.shellRpc); rpcClose(tsTrans.shellRpc);
tsTrans.shellRpc = NULL; tsTrans.shellRpc = NULL;

View File

@ -30,6 +30,7 @@ EMnStatus mnodeGetStatus();
void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg); void mnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg); void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell); void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell);
void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -15,7 +15,22 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "mnodeInt.h" #include "mnodeAuth.h"
int32_t mnodeInitAuth() { return 0; } int32_t mnodeInitAuth() { return 0; }
void mnodeCleanupAuth() {} void mnodeCleanupAuth() {}
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, TSDB_NETTEST_USER) == 0) {
char pass[32] = {0};
taosEncryptPass((uint8_t *)user, strlen(user), pass);
*spi = 0;
*encrypt = 0;
*ckey = 0;
memcpy(secret, pass, TSDB_KEY_LEN);
mDebug("nettest user is authorized");
return 0;
}
return 0;
}

View File

@ -61,6 +61,10 @@ void mnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsMint.fp.SendMsgToM
void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.fp.SendRedirectMsg)(rpcMsg, forShell); } void mnodeSendRedirectMsg(struct SRpcMsg *rpcMsg, bool forShell) { (*tsMint.fp.SendRedirectMsg)(rpcMsg, forShell); }
void mnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) {
(*tsMint.fp.GetDnodeEp)(dnodeId, ep, fqdn, port);
}
int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; } int32_t mnodeGetStatistics(SMnodeStat *stat) { return 0; }
static int32_t mnodeSetPara(SMnodePara para) { static int32_t mnodeSetPara(SMnodePara para) {
@ -242,5 +246,3 @@ void mnodeCleanup() {
mInfo("mnode is cleaned up"); mInfo("mnode is cleaned up");
} }
} }
int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; }