for cluster module
This commit is contained in:
parent
22a6eeb528
commit
3a056eddea
|
@ -30,12 +30,10 @@
|
|||
void (*dnodeInitMgmtIpFp)() = NULL;
|
||||
int32_t (*dnodeInitMgmtFp)() = NULL;
|
||||
void (*dnodeCleanUpMgmtFp)() = NULL;
|
||||
|
||||
void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
|
||||
void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
|
||||
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
|
||||
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
|
||||
|
||||
|
||||
static void *tsStatusTimer = NULL;
|
||||
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
|
||||
static void dnodeInitProcessShellMsg();
|
||||
|
|
|
@ -595,7 +595,6 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
int32_t numOfVnodes;
|
||||
SDnodeState dnodeState;
|
||||
SRpcIpSet ipList;
|
||||
SVnodeAccess vnodeAccess[];
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_MGMT_CONN_H
|
||||
#define TDENGINE_MGMT_CONN_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "mnode.h"
|
||||
|
||||
int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn);
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle);
|
||||
void mgmtSaveQhandle(void *qhandle);
|
||||
void mgmtFreeQhandle(void *qhandle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -53,6 +53,9 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg);
|
|||
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
|
||||
SDnodeObj* mgmtGetDnode(uint32_t ip);
|
||||
|
||||
extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip);
|
||||
extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries);
|
|||
int32_t mgmtCheckTimeSeries(uint32_t timeseries);
|
||||
int32_t mgmtCheckUserGrant();
|
||||
int32_t mgmtCheckDbGrant();
|
||||
int32_t mgmtCheckDnodeGrant();
|
||||
int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
|
|
|
@ -24,7 +24,10 @@ extern "C" {
|
|||
#include <stdbool.h>
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtInitMnodes();
|
||||
void mgmtCleanUpMnodes();
|
||||
|
||||
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -22,20 +22,22 @@ extern "C" {
|
|||
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
|
||||
int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
|
||||
int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
bool mgmtCheckQhandle(uint64_t qhandle);
|
||||
void mgmtSaveQhandle(void *qhandle);
|
||||
void mgmtFreeQhandle(void *qhandle);
|
||||
|
||||
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg);
|
||||
int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
|
||||
int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||
|
||||
int32_t mgmtKillQuery(char *qidstr, void *pConn);
|
||||
|
||||
int32_t mgmtKillStream(char *qidstr, void *pConn);
|
||||
|
||||
int32_t mgmtKillConnection(char *qidstr, void *pConn);
|
||||
|
||||
enum {
|
||||
|
|
|
@ -28,10 +28,8 @@ int32_t mgmtInitShell();
|
|||
void mgmtCleanUpShell();
|
||||
|
||||
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
|
||||
extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
|
||||
extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
|
||||
extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
|
||||
extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
|
||||
|
||||
/*
|
||||
* If table not exist, will create it
|
||||
|
|
|
@ -1,153 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "mgmtConn.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tschemautil.h"
|
||||
|
||||
typedef struct {
|
||||
char user[TSDB_TABLE_ID_LEN];
|
||||
uint64_t stime;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
} SConnInfo;
|
||||
|
||||
typedef struct {
|
||||
int numOfConns;
|
||||
int index;
|
||||
SConnInfo connInfo[];
|
||||
} SConnShow;
|
||||
|
||||
int mgmtGetConns(SShowObj *pShow, void *pConn) {
|
||||
// SAcctObj * pAcct = pConn->pAcct;
|
||||
// SConnShow *pConnShow;
|
||||
//
|
||||
// pthread_mutex_lock(&pAcct->mutex);
|
||||
//
|
||||
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
|
||||
// pConnShow->index = 0;
|
||||
// pConnShow->numOfConns = 0;
|
||||
//
|
||||
// if (pAcct->acctInfo.numOfConns > 0) {
|
||||
// pConn = pAcct->pConn;
|
||||
// SConnInfo *pConnInfo = pConnShow->connInfo;
|
||||
//
|
||||
// while (pConn && pConn->pUser) {
|
||||
// strcpy(pConnInfo->user, pConn->pUser->user);
|
||||
// pConnInfo->ip = pConn->ip;
|
||||
// pConnInfo->port = pConn->port;
|
||||
// pConnInfo->stime = pConn->stime;
|
||||
//
|
||||
// pConnShow->numOfConns++;
|
||||
// pConnInfo++;
|
||||
// pConn = pConn->next;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pthread_mutex_unlock(&pAcct->mutex);
|
||||
//
|
||||
// // sorting based on useconds
|
||||
//
|
||||
// pShow->pNode = pConnShow;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
|
||||
int cols = 0;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
|
||||
SSchema *pSchema = tsGetSchema(pMeta);
|
||||
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "user");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "login time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htons(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
|
||||
pShow->numOfRows = 1000000;
|
||||
pShow->pNode = NULL;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
mgmtGetConns(pShow, pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) {
|
||||
int numOfRows = 0;
|
||||
char *pWrite;
|
||||
int cols = 0;
|
||||
|
||||
SConnShow *pConnShow = (SConnShow *)pShow->pNode;
|
||||
|
||||
if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
SConnInfo *pNode = pConnShow->connInfo + pConnShow->index;
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, pNode->user);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
uint32_t ip = pNode->ip;
|
||||
sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port));
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int64_t *)pWrite = pNode->stime;
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
pConnShow->index++;
|
||||
}
|
||||
|
||||
if (numOfRows == 0) {
|
||||
tfree(pConnShow);
|
||||
}
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void mgmtSaveQhandle(void *qhandle) {
|
||||
}
|
||||
|
||||
void mgmtFreeQhandle(void *qhandle) {
|
||||
}
|
|
@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL;
|
|||
int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
|
||||
int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL;
|
||||
void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL;
|
||||
int32_t (*mgmtCreateDnodeFp)(uint32_t ip) = NULL;
|
||||
int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip) = NULL;
|
||||
|
||||
|
||||
static SDnodeObj tsDnodeObj = {0};
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
int32_t (*mgmtCheckUserGrantFp)() = NULL;
|
||||
int32_t (*mgmtCheckDbGrantFp)() = NULL;
|
||||
int32_t (*mgmtCheckDnodeGrantFp)() = NULL;
|
||||
void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
|
||||
void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
|
||||
int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL;
|
||||
|
@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL;
|
|||
|
||||
int32_t mgmtCheckUserGrant() {
|
||||
if (mgmtCheckUserGrantFp) {
|
||||
return mgmtCheckUserGrantFp();
|
||||
return (*mgmtCheckUserGrantFp)();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() {
|
|||
|
||||
int32_t mgmtCheckDbGrant() {
|
||||
if (mgmtCheckDbGrantFp) {
|
||||
return mgmtCheckDbGrantFp();
|
||||
return (*mgmtCheckDbGrantFp)();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mgmtCheckDnodeGrant() {
|
||||
if (mgmtCheckDnodeGrantFp) {
|
||||
return (*mgmtCheckDnodeGrantFp)();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() {
|
|||
void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
|
||||
pAcct->acctInfo.numOfTimeSeries += timeSeriesNum;
|
||||
if (mgmtAddTimeSeriesFp) {
|
||||
mgmtAddTimeSeriesFp(timeSeriesNum);
|
||||
(*mgmtAddTimeSeriesFp)(timeSeriesNum);
|
||||
}
|
||||
}
|
||||
|
||||
void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
|
||||
pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum;
|
||||
if (mgmtRestoreTimeSeriesFp) {
|
||||
mgmtRestoreTimeSeriesFp(timeSeriesNum);
|
||||
(*mgmtRestoreTimeSeriesFp)(timeSeriesNum);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mgmtCheckTimeSeries(uint32_t timeseries) {
|
||||
if (mgmtCheckTimeSeriesFp) {
|
||||
return mgmtCheckTimeSeriesFp(timeseries);
|
||||
return (*mgmtCheckTimeSeriesFp)(timeseries);
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -18,12 +18,32 @@
|
|||
#include "mgmtMnode.h"
|
||||
#include "mgmtUser.h"
|
||||
|
||||
void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
|
||||
int32_t (*mgmtInitMnodesFp)() = NULL;
|
||||
void (*mgmtCleanUpMnodesFp)() = NULL;
|
||||
int32_t (*mgmtGetMnodesNumFp)() = NULL;
|
||||
void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
|
||||
|
||||
static int32_t mgmtGetMnodesNum();
|
||||
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode);
|
||||
static int32_t mgmtGetMnodesNum() {
|
||||
if (mgmtGetMnodesNumFp) {
|
||||
return (*mgmtGetMnodesNumFp)();
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
|
||||
if (mgmtGetNextMnodeFp) {
|
||||
return (*mgmtGetNextMnodeFp)(pShow, pMnode);
|
||||
} else {
|
||||
if (*pMnode == NULL) {
|
||||
*pMnode = NULL;
|
||||
} else {
|
||||
*pMnode = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return *pMnode;
|
||||
}
|
||||
|
||||
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
|
||||
int32_t cols = 0;
|
||||
|
@ -88,11 +108,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
|
|||
char ipstr[20];
|
||||
|
||||
while (numOfRows < rows) {
|
||||
pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode);
|
||||
|
||||
|
||||
// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode);
|
||||
// if (pMnode == NULL) break;
|
||||
pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode);
|
||||
if (pMnode == NULL) break;
|
||||
|
||||
cols = 0;
|
||||
|
||||
|
@ -123,25 +140,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
|
|||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
||||
static int32_t mgmtGetMnodesNum() {
|
||||
if (mgmtGetMnodesNumFp) {
|
||||
return mgmtGetMnodesNumFp();
|
||||
} else {
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
|
||||
if (mgmtGetNextMnodeFp) {
|
||||
return mgmtGetNextMnodeFp(pShow, pMnode);
|
||||
} else {
|
||||
if (*pMnode == NULL) {
|
||||
*pMnode = NULL;
|
||||
} else {
|
||||
*pMnode = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return *pMnode;
|
||||
}
|
|
@ -15,16 +15,27 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mnode.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tschemautil.h"
|
||||
#include "mgmtProfile.h"
|
||||
|
||||
typedef struct {
|
||||
char user[TSDB_TABLE_ID_LEN + 1];
|
||||
uint64_t stime;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
} SConnInfo;
|
||||
|
||||
typedef struct {
|
||||
int numOfConns;
|
||||
int index;
|
||||
SConnInfo connInfo[];
|
||||
} SConnShow;
|
||||
|
||||
typedef struct {
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
char user[TSDB_TABLE_ID_LEN];
|
||||
char user[TSDB_TABLE_ID_LEN+ 1];
|
||||
} SCDesc;
|
||||
|
||||
typedef struct {
|
||||
|
@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) {
|
|||
|
||||
return TSDB_CODE_INVALID_CONNECTION;
|
||||
}
|
||||
|
||||
bool mgmtCheckQhandle(uint64_t qhandle) {
|
||||
return true;
|
||||
}
|
||||
|
||||
void mgmtSaveQhandle(void *qhandle) {
|
||||
}
|
||||
|
||||
void mgmtFreeQhandle(void *qhandle) {
|
||||
}
|
||||
|
||||
int mgmtGetConns(SShowObj *pShow, void *pConn) {
|
||||
// SAcctObj * pAcct = pConn->pAcct;
|
||||
// SConnShow *pConnShow;
|
||||
//
|
||||
// pthread_mutex_lock(&pAcct->mutex);
|
||||
//
|
||||
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
|
||||
// pConnShow->index = 0;
|
||||
// pConnShow->numOfConns = 0;
|
||||
//
|
||||
// if (pAcct->acctInfo.numOfConns > 0) {
|
||||
// pConn = pAcct->pConn;
|
||||
// SConnInfo *pConnInfo = pConnShow->connInfo;
|
||||
//
|
||||
// while (pConn && pConn->pUser) {
|
||||
// strcpy(pConnInfo->user, pConn->pUser->user);
|
||||
// pConnInfo->ip = pConn->ip;
|
||||
// pConnInfo->port = pConn->port;
|
||||
// pConnInfo->stime = pConn->stime;
|
||||
//
|
||||
// pConnShow->numOfConns++;
|
||||
// pConnInfo++;
|
||||
// pConn = pConn->next;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pthread_mutex_unlock(&pAcct->mutex);
|
||||
//
|
||||
// // sorting based on useconds
|
||||
//
|
||||
// pShow->pNode = pConnShow;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
|
||||
int32_t cols = 0;
|
||||
|
||||
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
|
||||
SSchema *pSchema = tsGetSchema(pMeta);
|
||||
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "user");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "ip:port");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 8;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema[cols].name, "login time");
|
||||
pSchema[cols].bytes = htons(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = htons(cols);
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
pShow->offset[0] = 0;
|
||||
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
|
||||
pShow->numOfRows = 1000000;
|
||||
pShow->pNode = NULL;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
mgmtGetConns(pShow, pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
||||
int32_t numOfRows = 0;
|
||||
char *pWrite;
|
||||
int32_t cols = 0;
|
||||
|
||||
SConnShow *pConnShow = (SConnShow *)pShow->pNode;
|
||||
|
||||
if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index;
|
||||
|
||||
while (numOfRows < rows) {
|
||||
SConnInfo *pNode = pConnShow->connInfo + pConnShow->index;
|
||||
cols = 0;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, pNode->user);
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
uint32_t ip = pNode->ip;
|
||||
sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port));
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int64_t *)pWrite = pNode->stime;
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
pConnShow->index++;
|
||||
}
|
||||
|
||||
if (numOfRows == 0) {
|
||||
tfree(pConnShow);
|
||||
}
|
||||
|
||||
pShow->numOfReads += numOfRows;
|
||||
return numOfRows;
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@
|
|||
#include "mgmtAcct.h"
|
||||
#include "mgmtBalance.h"
|
||||
#include "mgmtChildTable.h"
|
||||
#include "mgmtConn.h"
|
||||
#include "mgmtDb.h"
|
||||
#include "mgmtDnode.h"
|
||||
#include "mgmtDnodeInt.h"
|
||||
|
@ -1164,10 +1163,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle)
|
|||
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
|
||||
}
|
||||
|
||||
void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
|
||||
void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
|
||||
void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
|
||||
void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
|
||||
|
||||
static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
|
||||
if (!mgmtAlterAcctFp) {
|
||||
|
@ -1297,6 +1294,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle
|
|||
rpcSendResponse(ahandle, code, NULL, 0);
|
||||
}
|
||||
|
||||
static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
|
||||
if (!mgmtCreateDnodeFp) {
|
||||
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont;
|
||||
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to create dnode:%s, redirect this message", pCreate->ip);
|
||||
return;
|
||||
}
|
||||
|
||||
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
|
||||
if (pUser == NULL) {
|
||||
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER));
|
||||
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcmp(pUser->user, "root") != 0) {
|
||||
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS));
|
||||
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t code = (*mgmtCreateDnodeFp)(inet_addr(pCreate->ip));
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("dnode:%s is created by %s", pCreate->ip, pUser->user);
|
||||
} else {
|
||||
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code));
|
||||
}
|
||||
|
||||
rpcSendResponse(ahandle, code, NULL, 0);
|
||||
}
|
||||
|
||||
static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
|
||||
if (!mgmtDropDnodeByIpFp) {
|
||||
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont;
|
||||
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to drop dnode:%s, redirect this message", pDrop->ip);
|
||||
return;
|
||||
}
|
||||
|
||||
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
|
||||
if (pUser == NULL) {
|
||||
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER));
|
||||
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (strcmp(pUser->user, "root") != 0) {
|
||||
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS));
|
||||
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t code = (*mgmtDropDnodeByIpFp)(inet_addr(pDrop->ip));
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("dnode:%s set to removing state by %s", pDrop->ip, pUser->user);
|
||||
} else {
|
||||
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code));
|
||||
}
|
||||
|
||||
rpcSendResponse(ahandle, code, NULL, 0);
|
||||
}
|
||||
|
||||
void mgmtInitProcessShellMsg() {
|
||||
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
|
||||
mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
|
||||
|
|
Loading…
Reference in New Issue