server in mgmtShell

This commit is contained in:
slguan 2020-02-22 10:37:32 +08:00
parent cbbe8c16dc
commit a241006dd0
11 changed files with 324 additions and 421 deletions

View File

@ -325,7 +325,7 @@ typedef struct _sql_obj {
int64_t stime; int64_t stime;
uint32_t queryId; uint32_t queryId;
void * thandle; void * thandle;
SRpcIpSet ipSet; SRpcIpSet *ipSet;
void * pStream; void * pStream;
void * pSubscription; void * pSubscription;
char * sqlstr; char * sqlstr;

View File

@ -209,16 +209,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
} }
char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) { char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
SQList *pQList = (SQList *)pMsg; SCMQqueryList *pQList = (SCMQqueryList *)pMsg;
char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256; char * pMax = pMsg + TSDB_PAYLOAD_SIZE - 256;
SQDesc *pQdesc = pQList->qdesc; SCMQueryDesc *pQdesc = pQList->qdesc;
pQList->numOfQueries = 0; pQList->numOfQueries = 0;
// We extract the lock to tscBuildHeartBeatMsg function. // We extract the lock to tscBuildHeartBeatMsg function.
/* pthread_mutex_lock (&pObj->mutex); */ /* pthread_mutex_lock (&pObj->mutex); */
pMsg += sizeof(SQList); pMsg += sizeof(SCMQqueryList);
SSqlObj *pSql = pObj->sqlList; SSqlObj *pSql = pObj->sqlList;
while (pSql) { while (pSql) {
/* /*
@ -239,15 +239,15 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pQList->numOfQueries++; pQList->numOfQueries++;
pQdesc++; pQdesc++;
pSql = pSql->next; pSql = pSql->next;
pMsg += sizeof(SQDesc); pMsg += sizeof(SCMQueryDesc);
if (pMsg > pMax) break; if (pMsg > pMax) break;
} }
SSList *pSList = (SSList *)pMsg; SCMStreamList *pSList = (SCMStreamList *)pMsg;
SSDesc *pSdesc = pSList->sdesc; SCMStreamDesc *pSdesc = pSList->sdesc;
pSList->numOfStreams = 0; pSList->numOfStreams = 0;
pMsg += sizeof(SSList); pMsg += sizeof(SCMStreamList);
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1); strncpy(pSdesc->sql, pStream->pSql->sqlstr, TSDB_SHOW_SQL_LEN - 1);
@ -265,7 +265,7 @@ char *tscBuildQueryStreamDesc(char *pMsg, STscObj *pObj) {
pSList->numOfStreams++; pSList->numOfStreams++;
pSdesc++; pSdesc++;
pStream = pStream->next; pStream = pStream->next;
pMsg += sizeof(SSDesc); pMsg += sizeof(SCMStreamDesc);
if (pMsg > pMax) break; if (pMsg > pMax) break;
} }

View File

@ -31,12 +31,10 @@
#define TSC_MGMT_VNODE 999 #define TSC_MGMT_VNODE 999
SRpcIpSet tscMgmtIpList;
int tsMasterIndex = 0; int tsMasterIndex = 0;
int tsSlaveIndex = 1; int tsSlaveIndex = 1;
//temp SRpcIpSet tscMgmtIpList;
SRpcIpSet tscMgmtIpSet;
SRpcIpSet tscDnodeIpSet; SRpcIpSet tscDnodeIpSet;
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0}; int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
@ -62,29 +60,27 @@ void tscPrintMgmtIp() {
} }
} }
void tscSetMgmtIpListFromCluster(SIpList *pIpList) { void tscSetMgmtIpListFromCluster(SRpcIpSet *pIpList) {
tscMgmtIpList.numOfIps = pIpList->numOfIps; tscMgmtIpList.numOfIps = htons(pIpList->numOfIps);
if (memcmp(tscMgmtIpList.ip, pIpList->ip, pIpList->numOfIps * 4) != 0) { tscMgmtIpList.index = htons(pIpList->index);
for (int i = 0; i < pIpList->numOfIps; ++i) { tscMgmtIpList.port = htons(pIpList->port);
//tinet_ntoa(tscMgmtIpList.ipStr[i], pIpList->ip[i]); for (int32_t i = 0; i <tscMgmtIpList.numOfIps; ++i) {
tscMgmtIpList.ip[i] = pIpList->ip[i]; tscMgmtIpList.ip[i] = pIpList->ip[i];
} }
tscTrace("cluster mgmt IP list:");
tscPrintMgmtIp();
}
} }
void tscSetMgmtIpListFromEdge() { void tscSetMgmtIpListFromEdge() {
if (tscMgmtIpList.numOfIps != 2) { if (tscMgmtIpList.numOfIps != 1) {
tscMgmtIpList.numOfIps = 2; tscMgmtIpList.numOfIps = 1;
tscMgmtIpList.index = 0;
tscMgmtIpList.port = tsMgmtShellPort;
tscMgmtIpList.ip[0] = inet_addr(tsMasterIp); tscMgmtIpList.ip[0] = inet_addr(tsMasterIp);
tscMgmtIpList.ip[1] = inet_addr(tsMasterIp);
tscTrace("edge mgmt IP list:"); tscTrace("edge mgmt IP list:");
tscPrintMgmtIp(); tscPrintMgmtIp();
} }
} }
void tscSetMgmtIpList(SIpList *pIpList) { void tscSetMgmtIpList(SRpcIpSet *pIpList) {
/* /*
* The iplist returned by the cluster edition is the current management nodes * The iplist returned by the cluster edition is the current management nodes
* and the iplist returned by the edge edition is empty * and the iplist returned by the edge edition is empty
@ -120,8 +116,8 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
if (code == 0) { if (code == 0) {
SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
SIpList * pIpList = &pRsp->ipList; SRpcIpSet * pIpList = &pRsp->ipList;
tscSetMgmtIpList(pIpList); tscSetMgmtIpList(pIpList);
if (pRsp->killConnection) { if (pRsp->killConnection) {
@ -296,7 +292,7 @@ void tscGetConnToVnode(SSqlObj *pSql, uint8_t *pCode) {
pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle); pVPeersDesc[pSql->index].ip, pSql->index, pSql->thandle);
//TODO fetch from vpeerdesc //TODO fetch from vpeerdesc
pSql->ipSet = tscMgmtIpSet; pSql->ipSet = &tscMgmtIpList;
break; break;
} }
@ -364,17 +360,17 @@ int tscSendMsgToServer(SSqlObj *pSql) {
} }
void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) { void tscProcessMgmtRedirect(SSqlObj *pSql, uint8_t *cont) {
SIpList *pIpList = (SIpList *)(cont); // SIpList *pIpList = (SIpList *)(cont);
tscSetMgmtIpList(pIpList); // tscSetMgmtIpList(pIpList);
//
if (pSql->cmd.command < TSDB_SQL_READ) { // if (pSql->cmd.command < TSDB_SQL_READ) {
tsMasterIndex = 0; // tsMasterIndex = 0;
pSql->index = 0; // pSql->index = 0;
} else { // } else {
pSql->index++; // pSql->index++;
} // }
//
tscPrintMgmtIp(); // tscPrintMgmtIp();
} }
void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
@ -2884,18 +2880,18 @@ int tscEstimateHeartBeatMsgLength(SSqlObj *pSql) {
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
size += tsRpcHeadSize + sizeof(SMgmtHead); size += tsRpcHeadSize + sizeof(SMgmtHead);
size += sizeof(SQList); size += sizeof(SCMQqueryList);
SSqlObj *tpSql = pObj->sqlList; SSqlObj *tpSql = pObj->sqlList;
while (tpSql) { while (tpSql) {
size += sizeof(SQDesc); size += sizeof(SCMQueryDesc);
tpSql = tpSql->next; tpSql = tpSql->next;
} }
size += sizeof(SSList); size += sizeof(SCMStreamList);
SSqlStream *pStream = pObj->streamList; SSqlStream *pStream = pObj->streamList;
while (pStream) { while (pStream) {
size += sizeof(SSDesc); size += sizeof(SCMStreamDesc);
pStream = pStream->next; pStream = pStream->next;
} }
@ -3323,10 +3319,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
assert(len <= tListLen(pObj->db)); assert(len <= tListLen(pObj->db));
strncpy(pObj->db, temp, tListLen(pObj->db)); strncpy(pObj->db, temp, tListLen(pObj->db));
SIpList * pIpList; // SIpList * pIpList;
char *rsp = pRes->pRsp + sizeof(SCMConnectRsp); // char *rsp = pRes->pRsp + sizeof(SCMConnectRsp);
pIpList = (SIpList *)rsp; // pIpList = (SIpList *)rsp;
tscSetMgmtIpList(pIpList); // tscSetMgmtIpList(pIpList);
strcpy(pObj->sversion, pConnect->serverVersion); strcpy(pObj->sversion, pConnect->serverVersion);
pObj->writeAuth = pConnect->writeAuth; pObj->writeAuth = pConnect->writeAuth;

View File

@ -306,8 +306,8 @@ typedef struct _connObj {
uint32_t ip; // shell IP uint32_t ip; // shell IP
uint16_t port; // shell port uint16_t port; // shell port
void * thandle; void * thandle;
SQList * pQList; // query list SCMQqueryList * pQList; // query list
SSList * pSList; // stream list SCMStreamList * pSList; // stream list
uint64_t qhandle; uint64_t qhandle;
struct _connObj *prev, *next; struct _connObj *prev, *next;
} SConnObj; } SConnObj;

View File

@ -37,8 +37,8 @@ extern int sdbExtConns;
extern int sdbMaster; extern int sdbMaster;
extern uint32_t sdbPublicIp; extern uint32_t sdbPublicIp;
extern uint32_t sdbMasterStartTime; extern uint32_t sdbMasterStartTime;
extern SIpList *pSdbIpList; extern SRpcIpSet *pSdbIpList;
extern SIpList *pSdbPublicIpList; extern SRpcIpSet *pSdbPublicIpList;
extern void (*sdbWorkAsMasterCallback)(); // this function pointer will be set by taosd extern void (*sdbWorkAsMasterCallback)(); // this function pointer will be set by taosd
@ -71,8 +71,6 @@ enum _sdbaction {
SDB_MAX_ACTION_TYPES SDB_MAX_ACTION_TYPES
}; };
#ifdef CLUSTER
#define SDB_MAX_PEERS 4 #define SDB_MAX_PEERS 4
typedef struct { typedef struct {
uint32_t ip; uint32_t ip;
@ -103,8 +101,6 @@ extern SSdbPeer *sdbPeer[];
#define sdbInited (sdbPeer[0]) #define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status) #define sdbStatus (sdbPeer[0]->status)
#endif
void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory, void *sdbOpenTable(int maxRows, int32_t maxRowSize, char *name, uint8_t keyType, char *directory,
void *(*appTool)(char, void *, char *, int, int *)); void *(*appTool)(char, void *, char *, int, int *));
@ -138,6 +134,7 @@ int sdbCfgNode(char *cont);
int64_t sdbGetVersion(); int64_t sdbGetVersion();
int32_t sdbGetRunStatus();
#define TSDB_MAX_TABLES 1000 #define TSDB_MAX_TABLES 1000
extern void* tsChildTableSdb; extern void* tsChildTableSdb;

View File

@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h" #include "taosdef.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosdef.h" #include "taosdef.h"
#include "trpc.h"
// message type // message type
#define TSDB_MSG_TYPE_REG 1 #define TSDB_MSG_TYPE_REG 1
@ -187,25 +188,8 @@ typedef enum {
extern char *taosMsg[]; extern char *taosMsg[];
#define TSDB_MSG_DEF_MAX_MPEERS 5
#define TSDB_MSG_DEF_VERSION_LEN 64
#define TSDB_MSG_DEF_DB_LEN 128
#define TSDB_MSG_DEF_USER_LEN 128
#define TSDB_MSG_DEF_TABLE_LEN 128
#define TSDB_MSG_DEF_ACCT_LEN 128
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct {
char numOfIps;
uint32_t ip[];
} SIpList;
typedef struct {
char numOfIps;
uint32_t ip[TSDB_MAX_MGMT_IPS];
} SMgmtIpList;
typedef struct { typedef struct {
uint32_t customerId; uint32_t customerId;
uint32_t osId; uint32_t osId;
@ -332,20 +316,17 @@ typedef struct {
} SAlterTableMsg; } SAlterTableMsg;
typedef struct { typedef struct {
char clientVersion[TSDB_MSG_DEF_VERSION_LEN]; char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_MSG_DEF_VERSION_LEN]; char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_MSG_DEF_DB_LEN]; char db[TSDB_TABLE_ID_LEN];
} SCMConnectMsg; } SCMConnectMsg;
typedef struct { typedef struct {
char acctId[TSDB_MSG_DEF_ACCT_LEN]; char acctId[TSDB_ACCT_LEN];
char serverVersion[TSDB_MSG_DEF_VERSION_LEN]; char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth; int8_t writeAuth;
int8_t superAuth; int8_t superAuth;
int16_t index; SRpcIpSet ipList;
int16_t numOfIps;
uint16_t port;
uint32_t ip[TSDB_MSG_DEF_MAX_MPEERS];
} SCMConnectRsp; } SCMConnectRsp;
typedef struct { typedef struct {
@ -799,19 +780,12 @@ typedef struct {
char config[60]; char config[60];
} SCfgMsg; } SCfgMsg;
typedef struct {
uint32_t queryId;
uint32_t streamId;
char killConnection;
SIpList ipList;
} SHeartBeatRsp;
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
uint32_t queryId; uint32_t queryId;
int64_t useconds; int64_t useconds;
int64_t stime; int64_t stime;
} SQDesc; } SCMQueryDesc;
typedef struct { typedef struct {
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
@ -822,17 +796,29 @@ typedef struct {
int64_t stime; int64_t stime;
int64_t slidingTime; int64_t slidingTime;
int64_t interval; int64_t interval;
} SSDesc; } SCMStreamDesc;
typedef struct { typedef struct {
int32_t numOfQueries; int32_t numOfQueries;
SQDesc qdesc[]; SCMQueryDesc qdesc[];
} SQList; } SCMQqueryList;
typedef struct { typedef struct {
int32_t numOfStreams; int32_t numOfStreams;
SSDesc sdesc[]; SCMStreamDesc sdesc[];
} SSList; } SCMStreamList;
typedef struct {
SCMQqueryList qlist;
SCMStreamList slist;
} SCMHeartBeatMsg;
typedef struct {
uint32_t queryId;
uint32_t streamId;
int8_t killConnection;
SRpcIpSet ipList;
} SCMHeartBeatRsp;
typedef struct { typedef struct {
uint64_t handle; uint64_t handle;

View File

@ -22,21 +22,21 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int32_t mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int32_t mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn); int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn); int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn);
int mgmtSaveQueryStreamList(char *cont, int contLen, SConnObj *pConn); int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg);
int mgmtKillQuery(char *qidstr, SConnObj *pConn); int32_t mgmtKillQuery(char *qidstr, SConnObj *pConn);
int mgmtKillStream(char *qidstr, SConnObj *pConn); int32_t mgmtKillStream(char *qidstr, SConnObj *pConn);
int mgmtKillConnection(char *qidstr, SConnObj *pConn); int32_t mgmtKillConnection(char *qidstr, SConnObj *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -28,13 +28,13 @@ int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType); extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType);
extern int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCfgMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessDropAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
extern int32_t (*mgmtProcessCreateAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn); extern int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -28,55 +28,55 @@ typedef struct {
} SCDesc; } SCDesc;
typedef struct { typedef struct {
int index; int32_t index;
int numOfQueries; int32_t numOfQueries;
SCDesc * connInfo; SCDesc * connInfo;
SCDesc **cdesc; SCDesc **cdesc;
SQDesc qdesc[]; SCMQueryDesc qdesc[];
} SQueryShow; } SQueryShow;
typedef struct { typedef struct {
int index; int32_t index;
int numOfStreams; int32_t numOfStreams;
SCDesc * connInfo; SCDesc * connInfo;
SCDesc **cdesc; SCDesc **cdesc;
SSDesc sdesc[]; SCMStreamDesc sdesc[];
} SStreamShow; } SStreamShow;
int mgmtSaveQueryStreamList(char *cont, int contLen, SConnObj *pConn) { int32_t mgmtSaveQueryStreamList(SCMHeartBeatMsg *pHBMsg) {
SAcctObj *pAcct = pConn->pAcct; // SAcctObj *pAcct = pConn->pAcct;
//
// if (contLen <= 0 || pAcct == NULL) {
// return 0;
// }
//
// pthread_mutex_lock(&pAcct->mutex);
//
// if (pConn->pQList) {
// pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries;
// pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams;
// }
//
// pConn->pQList = realloc(pConn->pQList, contLen);
// memcpy(pConn->pQList, cont, contLen);
//
// pConn->pSList = (SCMStreamList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SCMQueryDesc) + sizeof(SCMQqueryList));
//
// pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries;
// pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams;
//
// pthread_mutex_unlock(&pAcct->mutex);
if (contLen <= 0 || pAcct == NULL) { return TSDB_CODE_SUCCESS;
return 0;
}
pthread_mutex_lock(&pAcct->mutex);
if (pConn->pQList) {
pAcct->acctInfo.numOfQueries -= pConn->pQList->numOfQueries;
pAcct->acctInfo.numOfStreams -= pConn->pSList->numOfStreams;
}
pConn->pQList = realloc(pConn->pQList, contLen);
memcpy(pConn->pQList, cont, contLen);
pConn->pSList = (SSList *)(((char *)pConn->pQList) + pConn->pQList->numOfQueries * sizeof(SQDesc) + sizeof(SQList));
pAcct->acctInfo.numOfQueries += pConn->pQList->numOfQueries;
pAcct->acctInfo.numOfStreams += pConn->pSList->numOfStreams;
pthread_mutex_unlock(&pAcct->mutex);
return 0;
} }
int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) { int32_t mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) {
SAcctObj * pAcct = pConn->pAcct; SAcctObj * pAcct = pConn->pAcct;
SQueryShow *pQueryShow; SQueryShow *pQueryShow;
pthread_mutex_lock(&pAcct->mutex); pthread_mutex_lock(&pAcct->mutex);
pQueryShow = malloc(sizeof(SQDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow)); pQueryShow = malloc(sizeof(SCMQueryDesc) * pAcct->acctInfo.numOfQueries + sizeof(SQueryShow));
pQueryShow->numOfQueries = 0; pQueryShow->numOfQueries = 0;
pQueryShow->index = 0; pQueryShow->index = 0;
pQueryShow->connInfo = NULL; pQueryShow->connInfo = NULL;
@ -87,7 +87,7 @@ int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) {
pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *)); pQueryShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfQueries * sizeof(SCDesc *));
pConn = pAcct->pConn; pConn = pAcct->pConn;
SQDesc * pQdesc = pQueryShow->qdesc; SCMQueryDesc * pQdesc = pQueryShow->qdesc;
SCDesc * pCDesc = pQueryShow->connInfo; SCDesc * pCDesc = pQueryShow->connInfo;
SCDesc **ppCDesc = pQueryShow->cdesc; SCDesc **ppCDesc = pQueryShow->cdesc;
@ -97,10 +97,10 @@ int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) {
pCDesc->port = pConn->port; pCDesc->port = pConn->port;
strcpy(pCDesc->user, pConn->pUser->user); strcpy(pCDesc->user, pConn->pUser->user);
memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SQDesc) * pConn->pQList->numOfQueries); memcpy(pQdesc, pConn->pQList->qdesc, sizeof(SCMQueryDesc) * pConn->pQList->numOfQueries);
pQdesc += pConn->pQList->numOfQueries; pQdesc += pConn->pQList->numOfQueries;
pQueryShow->numOfQueries += pConn->pQList->numOfQueries; pQueryShow->numOfQueries += pConn->pQList->numOfQueries;
for (int i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc; for (int32_t i = 0; i < pConn->pQList->numOfQueries; ++i, ++ppCDesc) *ppCDesc = pCDesc;
pCDesc++; pCDesc++;
} }
@ -117,8 +117,8 @@ int mgmtGetQueries(SShowObj *pShow, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { int32_t mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0; int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta); SSchema *pSchema = tsGetSchema(pMeta);
@ -156,7 +156,7 @@ int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
pShow->offset[0] = 0; pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000; pShow->numOfRows = 1000000;
pShow->pNode = NULL; pShow->pNode = NULL;
@ -166,7 +166,7 @@ int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtKillQuery(char *qidstr, SConnObj *pConn) { int32_t mgmtKillQuery(char *qidstr, SConnObj *pConn) {
char *temp, *chr, idstr[64]; char *temp, *chr, idstr[64];
strcpy(idstr, qidstr); strcpy(idstr, qidstr);
@ -192,8 +192,8 @@ int mgmtKillQuery(char *qidstr, SConnObj *pConn) {
pConn = pAcct->pConn; pConn = pAcct->pConn;
while (pConn) { while (pConn) {
if (pConn->ip == ip && pConn->port == port && pConn->pQList) { if (pConn->ip == ip && pConn->port == port && pConn->pQList) {
int i; int32_t i;
SQDesc *pQDesc = pConn->pQList->qdesc; SCMQueryDesc *pQDesc = pConn->pQList->qdesc;
for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) { for (i = 0; i < pConn->pQList->numOfQueries; ++i, ++pQDesc) {
if (pQDesc->queryId == queryId) break; if (pQDesc->queryId == queryId) break;
} }
@ -219,17 +219,17 @@ _error:
return TSDB_CODE_INVALID_QUERY_ID; return TSDB_CODE_INVALID_QUERY_ID;
} }
int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int numOfRows = 0; int32_t numOfRows = 0;
char *pWrite; char *pWrite;
int cols = 0; int32_t cols = 0;
SQueryShow *pQueryShow = (SQueryShow *)pShow->pNode; SQueryShow *pQueryShow = (SQueryShow *)pShow->pNode;
if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index; if (rows > pQueryShow->numOfQueries - pQueryShow->index) rows = pQueryShow->numOfQueries - pQueryShow->index;
while (numOfRows < rows) { while (numOfRows < rows) {
SQDesc *pNode = pQueryShow->qdesc + pQueryShow->index; SCMQueryDesc *pNode = pQueryShow->qdesc + pQueryShow->index;
SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index]; SCDesc *pCDesc = pQueryShow->cdesc[pQueryShow->index];
cols = 0; cols = 0;
@ -269,13 +269,13 @@ int mgmtRetrieveQueries(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
return numOfRows; return numOfRows;
} }
int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) { int32_t mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) {
SAcctObj * pAcct = pConn->pAcct; SAcctObj * pAcct = pConn->pAcct;
SStreamShow *pStreamShow; SStreamShow *pStreamShow;
pthread_mutex_lock(&pAcct->mutex); pthread_mutex_lock(&pAcct->mutex);
pStreamShow = malloc(sizeof(SSDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow)); pStreamShow = malloc(sizeof(SCMStreamDesc) * pAcct->acctInfo.numOfStreams + sizeof(SQueryShow));
pStreamShow->numOfStreams = 0; pStreamShow->numOfStreams = 0;
pStreamShow->index = 0; pStreamShow->index = 0;
pStreamShow->connInfo = NULL; pStreamShow->connInfo = NULL;
@ -286,7 +286,7 @@ int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) {
pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *)); pStreamShow->cdesc = (SCDesc **)malloc(pAcct->acctInfo.numOfStreams * sizeof(SCDesc *));
pConn = pAcct->pConn; pConn = pAcct->pConn;
SSDesc * pSdesc = pStreamShow->sdesc; SCMStreamDesc * pSdesc = pStreamShow->sdesc;
SCDesc * pCDesc = pStreamShow->connInfo; SCDesc * pCDesc = pStreamShow->connInfo;
SCDesc **ppCDesc = pStreamShow->cdesc; SCDesc **ppCDesc = pStreamShow->cdesc;
@ -296,10 +296,10 @@ int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) {
pCDesc->port = pConn->port; pCDesc->port = pConn->port;
strcpy(pCDesc->user, pConn->pUser->user); strcpy(pCDesc->user, pConn->pUser->user);
memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SSDesc) * pConn->pSList->numOfStreams); memcpy(pSdesc, pConn->pSList->sdesc, sizeof(SCMStreamDesc) * pConn->pSList->numOfStreams);
pSdesc += pConn->pSList->numOfStreams; pSdesc += pConn->pSList->numOfStreams;
pStreamShow->numOfStreams += pConn->pSList->numOfStreams; pStreamShow->numOfStreams += pConn->pSList->numOfStreams;
for (int i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc; for (int32_t i = 0; i < pConn->pSList->numOfStreams; ++i, ++ppCDesc) *ppCDesc = pCDesc;
pCDesc++; pCDesc++;
} }
@ -316,8 +316,8 @@ int mgmtGetStreams(SShowObj *pShow, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { int32_t mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
int cols = 0; int32_t cols = 0;
SSchema *pSchema = tsGetSchema(pMeta); SSchema *pSchema = tsGetSchema(pMeta);
pShow->bytes[cols] = TSDB_USER_LEN; pShow->bytes[cols] = TSDB_USER_LEN;
@ -366,7 +366,7 @@ int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
pShow->numOfColumns = cols; pShow->numOfColumns = cols;
pShow->offset[0] = 0; pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; for (int32_t i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000; pShow->numOfRows = 1000000;
pShow->pNode = NULL; pShow->pNode = NULL;
@ -376,17 +376,17 @@ int mgmtGetStreamMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SConnObj *pConn) {
int numOfRows = 0; int32_t numOfRows = 0;
char *pWrite; char *pWrite;
int cols = 0; int32_t cols = 0;
SStreamShow *pStreamShow = (SStreamShow *)pShow->pNode; SStreamShow *pStreamShow = (SStreamShow *)pShow->pNode;
if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index; if (rows > pStreamShow->numOfStreams - pStreamShow->index) rows = pStreamShow->numOfStreams - pStreamShow->index;
while (numOfRows < rows) { while (numOfRows < rows) {
SSDesc *pNode = pStreamShow->sdesc + pStreamShow->index; SCMStreamDesc *pNode = pStreamShow->sdesc + pStreamShow->index;
SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index]; SCDesc *pCDesc = pStreamShow->cdesc[pStreamShow->index];
cols = 0; cols = 0;
@ -434,7 +434,7 @@ int mgmtRetrieveStreams(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
return numOfRows; return numOfRows;
} }
int mgmtKillStream(char *qidstr, SConnObj *pConn) { int32_t mgmtKillStream(char *qidstr, SConnObj *pConn) {
char *temp, *chr, idstr[64]; char *temp, *chr, idstr[64];
strcpy(idstr, qidstr); strcpy(idstr, qidstr);
@ -460,8 +460,8 @@ int mgmtKillStream(char *qidstr, SConnObj *pConn) {
pConn = pAcct->pConn; pConn = pAcct->pConn;
while (pConn) { while (pConn) {
if (pConn->ip == ip && pConn->port == port && pConn->pSList) { if (pConn->ip == ip && pConn->port == port && pConn->pSList) {
int i; int32_t i;
SSDesc *pSDesc = pConn->pSList->sdesc; SCMStreamDesc *pSDesc = pConn->pSList->sdesc;
for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) { for (i = 0; i < pConn->pSList->numOfStreams; ++i, ++pSDesc) {
if (pSDesc->streamId == streamId) break; if (pSDesc->streamId == streamId) break;
} }
@ -487,7 +487,7 @@ _error:
return TSDB_CODE_INVALID_STREAM_ID; return TSDB_CODE_INVALID_STREAM_ID;
} }
int mgmtKillConnection(char *qidstr, SConnObj *pConn) { int32_t mgmtKillConnection(char *qidstr, SConnObj *pConn) {
SConnObj *pConn1 = NULL; SConnObj *pConn1 = NULL;
char * temp, *chr, idstr[64]; char * temp, *chr, idstr[64];
strcpy(idstr, qidstr); strcpy(idstr, qidstr);

View File

@ -46,9 +46,11 @@ static void mgmtInitShowMsgFp();
void * tsShellConn = NULL; void * tsShellConn = NULL;
SConnObj *connList; SConnObj *connList;
void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code);
int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code);
int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int32_t, SConnObj *); static int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, void *ahandle);
static int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
void mgmtInitProcessShellMsg(); void mgmtInitProcessShellMsg();
int32_t mgmtRedirectMsg(SConnObj *pConn, int32_t msgType); int32_t mgmtRedirectMsg(SConnObj *pConn, int32_t msgType);
int32_t mgmtKillQuery(char *queryId, SConnObj *pConn); int32_t mgmtKillQuery(char *queryId, SConnObj *pConn);
@ -156,31 +158,8 @@ static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMe
// return 0; // return 0;
//} //}
/**
* check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one.
*
* @param pMsg
* @return
*/
bool mgmtCheckMeterMetaMsgType(char *pMsg) {
// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
//
// int16_t autoCreate = htons(pInfo->createFlag);
// STableInfo *table = mgmtGetTable(pInfo->meterId);
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue int32_t mgmtProcessMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
// bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1);
// if (addIntoTranQueue) {
// mTrace("meter:%s auto created task added", pInfo->meterId);
// }
// bool addIntoTranQueue = true;
// return addIntoTranQueue;
return 0;
}
int32_t mgmtProcessMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; // SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
// STabObj * pMeterObj = NULL; // STabObj * pMeterObj = NULL;
// SVgObj * pVgroup = NULL; // SVgObj * pVgroup = NULL;
@ -352,7 +331,7 @@ int32_t mgmtProcessMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
* | | | * | | |
* pStart pCurMeter pTail * pStart pCurMeter pTail
**/ **/
int32_t mgmtProcessMultiMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessMultiMeterMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
// SDbObj * pDbObj = NULL; // SDbObj * pDbObj = NULL;
// STabObj * pMeterObj = NULL; // STabObj * pMeterObj = NULL;
// SVgObj * pVgroup = NULL; // SVgObj * pVgroup = NULL;
@ -507,7 +486,7 @@ int32_t mgmtProcessMultiMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn
return 0; return 0;
} }
int32_t mgmtProcessMetricMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessMetricMetaMsg(void *pCont, int32_t contLen, void *ahandle) {
// SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg; // SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg;
// STabObj * pMetric; // STabObj * pMetric;
// STaosRsp * pRsp; // STaosRsp * pRsp;
@ -558,7 +537,7 @@ int32_t mgmtProcessMetricMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessCreateDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateDbMsg(void *pCont, int32_t contLen, void *ahandle) {
// SCreateDbMsg *pCreate = (SCreateDbMsg *)pMsg; // SCreateDbMsg *pCreate = (SCreateDbMsg *)pMsg;
// int32_t code = 0; // int32_t code = 0;
// //
@ -593,12 +572,12 @@ int32_t mgmtProcessCreateDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessCreateMnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateMnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
// return rpcSendResponse(pConn->thandle, TSDB_MSG_TYPE_CREATE_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); // return rpcSendResponse(pConn->thandle, TSDB_MSG_TYPE_CREATE_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t mgmtProcessAlterDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterDbMsg(void *pCont, int32_t contLen, void *ahandle) {
// SAlterDbMsg *pAlter = (SAlterDbMsg *)pMsg; // SAlterDbMsg *pAlter = (SAlterDbMsg *)pMsg;
// int32_t code = 0; // int32_t code = 0;
// //
@ -624,7 +603,7 @@ int32_t mgmtProcessAlterDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessKillQueryMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessKillQueryMsg(void *pCont, int32_t contLen, void *ahandle) {
// int32_t code = 0; // int32_t code = 0;
// SKillQuery *pKill = (SKillQuery *)pMsg; // SKillQuery *pKill = (SKillQuery *)pMsg;
// //
@ -639,7 +618,7 @@ int32_t mgmtProcessKillQueryMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessKillStreamMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessKillStreamMsg(void *pCont, int32_t contLen, void *ahandle) {
// int32_t code = 0; // int32_t code = 0;
// SKillStream *pKill = (SKillStream *)pMsg; // SKillStream *pKill = (SKillStream *)pMsg;
// //
@ -654,7 +633,7 @@ int32_t mgmtProcessKillStreamMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessKillConnectionMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessKillConnectionMsg(void *pCont, int32_t contLen, void *ahandle) {
// int32_t code = 0; // int32_t code = 0;
// SKillConnection *pKill = (SKillConnection *)pMsg; // SKillConnection *pKill = (SKillConnection *)pMsg;
// //
@ -669,7 +648,7 @@ int32_t mgmtProcessKillConnectionMsg(char *pMsg, int32_t msgLen, SConnObj *pConn
return 0; return 0;
} }
int32_t mgmtProcessCreateUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateUserMsg(void *pCont, int32_t contLen, void *ahandle) {
// SCreateUserMsg *pCreate = (SCreateUserMsg *)pMsg; // SCreateUserMsg *pCreate = (SCreateUserMsg *)pMsg;
// int32_t code = 0; // int32_t code = 0;
// //
@ -691,7 +670,7 @@ int32_t mgmtProcessCreateUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessAlterUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterUserMsg(void *pCont, int32_t contLen, void *ahandle) {
// SAlterUserMsg *pAlter = (SAlterUserMsg *)pMsg; // SAlterUserMsg *pAlter = (SAlterUserMsg *)pMsg;
// int32_t code = 0; // int32_t code = 0;
// SUserObj * pUser; // SUserObj * pUser;
@ -803,7 +782,7 @@ int32_t mgmtProcessAlterUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessDropUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessDropUserMsg(void *pCont, int32_t contLen, void *ahandle) {
// SDropUserMsg *pDrop = (SDropUserMsg *)pMsg; // SDropUserMsg *pDrop = (SDropUserMsg *)pMsg;
// int32_t code = 0; // int32_t code = 0;
// SUserObj * pUser; // SUserObj * pUser;
@ -862,7 +841,7 @@ int32_t mgmtProcessDropUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessDropDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessDropDbMsg(void *pCont, int32_t contLen, void *ahandle) {
// SDropDbMsg *pDrop = (SDropDbMsg *)pMsg; // SDropDbMsg *pDrop = (SDropDbMsg *)pMsg;
// int32_t code; // int32_t code;
// //
@ -883,7 +862,7 @@ int32_t mgmtProcessDropDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessUseDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessUseDbMsg(void *pCont, int32_t contLen, void *ahandle) {
// SUseDbMsg *pUse = (SUseDbMsg *)pMsg; // SUseDbMsg *pUse = (SUseDbMsg *)pMsg;
// int32_t code; // int32_t code;
// //
@ -933,7 +912,7 @@ static void mgmtInitShowMsgFp() {
mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes;
} }
int32_t mgmtProcessShowMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessShowMsg(void *pCont, int32_t contLen, void *ahandle) {
// SShowMsg * pShowMsg = (SShowMsg *)pMsg; // SShowMsg * pShowMsg = (SShowMsg *)pMsg;
// STaosRsp * pRsp; // STaosRsp * pRsp;
// char * pStart; // char * pStart;
@ -992,7 +971,7 @@ int32_t mgmtProcessShowMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessRetrieveMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessRetrieveMsg(void *pCont, int32_t contLen, void *ahandle) {
// SRetrieveMeterMsg *pRetrieve; // SRetrieveMeterMsg *pRetrieve;
// SRetrieveMeterRsp *pRsp; // SRetrieveMeterRsp *pRsp;
// int32_t rowsToRead = 0, size = 0, rowsRead = 0; // int32_t rowsToRead = 0, size = 0, rowsRead = 0;
@ -1080,7 +1059,7 @@ int32_t mgmtProcessRetrieveMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessCreateTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateTableMsg(void *pCont, int32_t contLen, void *ahandle) {
// SCreateTableMsg *pCreate = (SCreateTableMsg *)pMsg; // SCreateTableMsg *pCreate = (SCreateTableMsg *)pMsg;
// int32_t code; // int32_t code;
// SSchema * pSchema; // SSchema * pSchema;
@ -1135,7 +1114,7 @@ int32_t mgmtProcessCreateTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessDropTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessDropTableMsg(void *pCont, int32_t contLen, void *ahandle) {
// SDropTableMsg *pDrop = (SDropTableMsg *)pMsg; // SDropTableMsg *pDrop = (SDropTableMsg *)pMsg;
// int32_t code; // int32_t code;
// //
@ -1161,7 +1140,7 @@ int32_t mgmtProcessDropTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessAlterTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterTableMsg(void *pCont, int32_t contLen, void *ahandle) {
// SAlterTableMsg *pAlter = (SAlterTableMsg *)pMsg; // SAlterTableMsg *pAlter = (SAlterTableMsg *)pMsg;
// int32_t code; // int32_t code;
// //
@ -1202,7 +1181,7 @@ int32_t mgmtProcessAlterTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int32_t mgmtProcessCfgDnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCfgDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
// int32_t code = 0; // int32_t code = 0;
// SCfgMsg *pCfg = (SCfgMsg *)pMsg; // SCfgMsg *pCfg = (SCfgMsg *)pMsg;
// //
@ -1220,80 +1199,51 @@ int32_t mgmtProcessCfgDnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// //
// if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user); // if (code == 0) mTrace("dnode:%s is configured by %s", pCfg->ip, pConn->pUser->user);
// //
// return 0;
//}
//
//int32_t mgmtProcessHeartBeatMsg(char *cont, int32_t contLen, SConnObj *pConn) {
// char * pStart, *pMsg;
// int32_t msgLen;
// STaosRsp *pRsp;
//
// mgmtSaveQueryStreamList(cont, contLen, pConn);
//
// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_HEARTBEAT_RSP, 128);
// if (pStart == NULL) return 0;
// pMsg = pStart;
// pRsp = (STaosRsp *)pMsg;
// pRsp->code = 0;
// pMsg = (char *)pRsp->more;
//
// SHeartBeatRsp *pHBRsp = (SHeartBeatRsp *)pRsp->more;
// pHBRsp->queryId = pConn->queryId;
// pConn->queryId = 0;
// pHBRsp->streamId = pConn->streamId;
// pHBRsp->streamId = pConn->streamId;
// pConn->streamId = 0;
// pHBRsp->killConnection = pConn->killConnection;
//
// if (pConn->usePublicIp) {
// if (pSdbPublicIpList != NULL) {
// int32_t size = pSdbPublicIpList->numOfIps * 4;
// pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps;
// memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size);
// pMsg += sizeof(SHeartBeatRsp) + size;
// } else {
// pHBRsp->ipList.numOfIps = 0;
// pMsg += sizeof(SHeartBeatRsp);
// }
//
// } else {
// if (pSdbIpList != NULL) {
// int32_t size = pSdbIpList->numOfIps * 4;
// pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps;
// memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size);
// pMsg += sizeof(SHeartBeatRsp) + size;
// } else {
// pHBRsp->ipList.numOfIps = 0;
// pMsg += sizeof(SHeartBeatRsp);
// }
// }
// msgLen = pMsg - pStart;
//
// taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
return 0; return 0;
} }
void mgmtEstablishConn(SConnObj *pConn) { int32_t mgmtProcessHeartBeatMsg(void *pCont, int32_t contLen, void *ahandle) {
// atomic_fetch_add_32(&mgmtShellConns, 1); SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) pCont;
// atomic_fetch_add_32(&sdbExtConns, 1); mgmtSaveQueryStreamList(pHBMsg);
// pConn->stime = taosGetTimestampMs();
// SCMHeartBeatRsp *pHBRsp = (SCMHeartBeatRsp *) rpcMallocCont(contLen);
// if (strcmp(pConn->pUser->user, "root") == 0) { if (pHBRsp == NULL) {
// pConn->superAuth = 1; rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
// pConn->writeAuth = 1; rpcFreeCont(pCont);
// } else { return TSDB_CODE_SERV_OUT_OF_MEMORY;
// pConn->superAuth = pConn->pUser->superAuth; }
// pConn->writeAuth = pConn->pUser->writeAuth;
// if (pConn->superAuth) { SRpcConnInfo connInfo;
// pConn->writeAuth = 1; rpcGetConnInfo(ahandle, &connInfo);
// }
// } pHBRsp->ipList.index = 0;
// pHBRsp->ipList.port = htons(tsMgmtShellPort);
// int32_t tempint32; pHBRsp->ipList.numOfIps = 0;
// uint32_t tempuint32; if (pSdbPublicIpList != NULL && pSdbIpList != NULL) {
// taosGetRpcConnInfo(pConn->thandle, &tempuint32, &pConn->ip, &pConn->port, &tempint32, &tempint32); pHBRsp->ipList.numOfIps = htons(pSdbPublicIpList->numOfIps);
// mgmtAddConnIntoAcct(pConn); if (connInfo.serverIp == tsPublicIpInt) {
for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) {
pHBRsp->ipList.ip[i] = htonl(pSdbPublicIpList->ip[i]);
}
} else {
for (int i = 0; i < pSdbIpList->numOfIps; ++i) {
pHBRsp->ipList.ip[i] = htonl(pSdbIpList->ip[i]);
}
}
}
/*
* TODO
* Dispose kill stream or kill query message
*/
pHBRsp->queryId = 0;
pHBRsp->streamId = 0;
pHBRsp->killConnection = 0;
rpcSendResponse(ahandle, TSDB_CODE_SUCCESS, pHBRsp, sizeof(SCMHeartBeatMsg));
rpcFreeCont(pCont);
return TSDB_CODE_SUCCESS;
} }
int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
@ -1313,36 +1263,32 @@ int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtProcessConnectMsg(int8_t type, void *pCont, int32_t contLen, void *ahandle, int32_t code) { static int32_t mgmtProcessConnectMsg(void *pCont, int32_t contLen, void *ahandle) {
SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont; SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont;
SRpcConnInfo connInfo; SRpcConnInfo connInfo;
rpcGetConnInfo(ahandle, &connInfo); rpcGetConnInfo(ahandle, &connInfo);
int32_t code;
SUserObj *pUser = mgmtGetUser(connInfo.user); SUserObj *pUser = mgmtGetUser(connInfo.user);
if (pUser == NULL) { if (pUser == NULL) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); code = TSDB_CODE_INVALID_USER;
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); goto connect_over;
return TSDB_CODE_INVALID_USER;
} }
if (mgmtCheckExpired()) { if (mgmtCheckExpired()) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); code = TSDB_CODE_GRANT_EXPIRED;
rpcSendResponse(ahandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0); goto connect_over;
return TSDB_CODE_GRANT_EXPIRED;
} }
SAcctObj *pAcct = mgmtGetAcct(pUser->acct); SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
if (pAcct == NULL) { if (pAcct == NULL) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); code = TSDB_CODE_INVALID_ACCT;
rpcSendResponse(ahandle, TSDB_CODE_INVALID_ACCT, NULL, 0); goto connect_over;
return TSDB_CODE_INVALID_ACCT;
} }
code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); code = taosCheckVersion(pConnectMsg->clientVersion, version, 3);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); goto connect_over;
rpcSendResponse(ahandle, code, NULL, 0);
return code;
} }
if (pConnectMsg->db[0]) { if (pConnectMsg->db[0]) {
@ -1350,131 +1296,101 @@ int32_t mgmtProcessConnectMsg(int8_t type, void *pCont, int32_t contLen, void *a
sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
SDbObj *pDb = mgmtGetDb(dbName); SDbObj *pDb = mgmtGetDb(dbName);
if (pDb == NULL) { if (pDb == NULL) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); code = TSDB_CODE_INVALID_DB;
rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0); goto connect_over;
return TSDB_CODE_INVALID_DB;
} }
} }
SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp)); SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
if (pConnectRsp == NULL) { if (pConnectRsp == NULL) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); code = TSDB_CODE_SERV_OUT_OF_MEMORY;
rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0); goto connect_over;
return TSDB_CODE_SERV_OUT_OF_MEMORY;
} }
sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
strcpy(pConnectRsp->serverVersion, version); strcpy(pConnectRsp->serverVersion, version);
pConnectRsp->writeAuth = pUser->writeAuth; pConnectRsp->writeAuth = pUser->writeAuth;
pConnectRsp->superAuth = pUser->superAuth; pConnectRsp->superAuth = pUser->superAuth;
pConnectRsp->ipList.index = 0;
pConnectRsp->index = 0; pConnectRsp->ipList.port = htons(tsMgmtShellPort);
pConnectRsp->ipList.numOfIps = 0;
if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { if (pSdbPublicIpList != NULL && pSdbIpList != NULL) {
pConnectRsp->numOfIps = htons(pSdbPublicIpList->numOfIps); pConnectRsp->ipList.numOfIps = htons(pSdbPublicIpList->numOfIps);
pConnectRsp->port = htons(tsMgmtShellPort);
if (connInfo.serverIp == tsPublicIpInt) { if (connInfo.serverIp == tsPublicIpInt) {
for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) { for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) {
pConnectRsp->ip[i] = htonl(pSdbPublicIpList->ip[i]); pConnectRsp->ipList.ip[i] = htonl(pSdbPublicIpList->ip[i]);
} }
} else { } else {
for (int i = 0; i < pSdbIpList->numOfIps; ++i) { for (int i = 0; i < pSdbIpList->numOfIps; ++i) {
pConnectRsp->ip[i] = htonl(pSdbIpList->ip[i]); pConnectRsp->ipList.ip[i] = htonl(pSdbIpList->ip[i]);
} }
} }
} else {
pConnectRsp->numOfIps = 0;
pConnectRsp->port = htons(tsMgmtShellPort);
} }
connect_over:
if (code != TSDB_CODE_SUCCESS) {
mLError("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
rpcSendResponse(ahandle, code, NULL, 0);
} else {
mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code); mLPrint("user:%s login from %s, code:%d", connInfo.user, taosIpStr(connInfo.clientIp), code);
return TSDB_CODE_SUCCESS; rpcSendResponse(ahandle, code, pConnectRsp, sizeof(pConnectRsp));
}
rpcFreeCont(pCont);
return code;
} }
void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code) {
// SIntMsg * pMsg = (SIntMsg *)msg; /**
// SConnObj *pConn = (SConnObj *)ahandle; * check if we need to add mgmtProcessMeterMetaMsg into tranQueue, which will be executed one-by-one.
// */
// if (msg == NULL) { static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
// if (pConn) { SMeterInfoMsg *pInfo = (SMeterInfoMsg *) pMsg;
// mgmtRemoveConnFromAcct(pConn); int16_t autoCreate = htons(pInfo->createFlag);
// atomic_fetch_sub_32(&mgmtShellConns, 1); STableInfo *pTable = mgmtGetTable(pInfo->meterId);
// atomic_fetch_sub_32(&sdbExtConns, 1);
// mTrace("connection from %s is closed", pConn->pUser->user); // If table does not exists and autoCreate flag is set, we add the handler into task queue
// memset(pConn, 0, sizeof(SConnObj)); bool addIntoTranQueue = (pTable == NULL && autoCreate == 1);
// } if (addIntoTranQueue) {
// mTrace("meter:%s auto created task added", pInfo->meterId);
// return NULL; }
// }
// return addIntoTranQueue;
//#ifdef CLUSTER }
// if (sdbInited == NULL || sdbStatus != SDB_STATUS_SERVING) {
// taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
// mTrace("shell msg is ignored since SDB is not ready"); if ((type == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(pCont))) ||
// } type == TSDB_MSG_TYPE_STABLE_META || type == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
//#endif type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_MULTI_TABLE_META ||
// type == TSDB_MSG_TYPE_CONNECT) {
// if (pConn == NULL) { return true;
// pConn = connList + pMsg->destId; }
// pConn->thandle = thandle;
// strcpy(pConn->user, pMsg->meterId); return false;
// pConn->usePublicIp = (pMsg->destIp == tsPublicIpInt ? 1 : 0); }
// mTrace("pConn:%p is rebuild, destIp:%s publicIp:%s usePublicIp:%u",
// pConn, taosIpStr(pMsg->destIp), taosIpStr(tsPublicIpInt), pConn->usePublicIp); static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) {
// } if (sdbGetRunStatus() != SDB_STATUS_SERVING) {
// mTrace("shell msg is ignored since SDB is not ready");
// if (pMsg->msgType == TSDB_MSG_TYPE_CONNECT) { rpcSendResponse(ahandle, TSDB_CODE_NOT_READY, NULL, 0);
// (*mgmtProcessShellMsg[pMsg->msgType])((char *)pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pConn); return;
// } else { }
// SMgmtHead *pHead = (SMgmtHead *)pMsg->content;
// if (pConn->pAcct == NULL) { if (mgmtCheckMsgReadOnly(type, pCont)) {
// pConn->pUser = mgmtGetUser(pConn->user); (*mgmtProcessShellMsg[(int8_t)type])(pCont, contLen, ahandle);
// if (pConn->pUser) { } else {
// pConn->pAcct = mgmtGetAcct(pConn->pUser->acct); if (mgmtProcessShellMsg[(int8_t)type]) {
// mgmtEstablishConn(pConn); SSchedMsg schedMsg;
// mTrace("login from:%x:%hu", pConn->ip, htons(pConn->port)); schedMsg.msg = malloc(contLen);
// } memcpy(schedMsg.msg, pCont, contLen);
// } schedMsg.fp = mgmtProcessTranRequest;
// schedMsg.tfp = NULL;
// if (pConn->pAcct) { schedMsg.thandle = ahandle;
// if (pConn->pDb == NULL || strncmp(pConn->pDb->name, pHead->db, tListLen(pConn->pDb->name)) != 0) { taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
// pConn->pDb = mgmtGetDb(pHead->db); } else {
// } mError("%s from shell is not processed", taosMsg[(int8_t)type]);
// }
// char *cont = (char *)pMsg->content + sizeof(SMgmtHead); }
// int32_t contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
//
// // read-only request can be executed concurrently
// if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) ||
// pMsg->msgType == TSDB_MSG_TYPE_STABLE_META || pMsg->msgType == TSDB_MSG_TYPE_DNODE_RETRIEVE ||
// pMsg->msgType == TSDB_MSG_TYPE_SHOW || pMsg->msgType == TSDB_MSG_TYPE_MULTI_TABLE_META) {
// (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn);
// } else {
// if (mgmtProcessShellMsg[pMsg->msgType]) {
// SSchedMsg schedMsg;
// schedMsg.msg = malloc(pMsg->msgLen); // Message to deal with
// memcpy(schedMsg.msg, pMsg, pMsg->msgLen);
//
// schedMsg.fp = mgmtProcessTranRequest;
// schedMsg.tfp = NULL;
// schedMsg.thandle = pConn;
//
// taosScheduleTask(tsMgmtTranQhandle, &schedMsg);
// } else {
// mError("%s from shell is not processed", taosMsg[pMsg->msgType]);
// }
// }
// } else {
// taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_DISCONNECTED);
// }
// }
//
// if (pConn->pAcct == NULL) {
// taosCloseRpcConn(pConn->thandle);
// memset(pConn, 0, sizeof(SConnObj)); // close the connection;
// pConn = NULL;
// }
//
// return pConn;
} }
void mgmtInitProcessShellMsg() { void mgmtInitProcessShellMsg() {
@ -1499,7 +1415,7 @@ void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DNODE_RETRIEVE] = mgmtProcessRetrieveMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_SHOW] = mgmtProcessShowMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
// mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_DNODE] = mgmtProcessCreateDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_DROP_DNODE] = mgmtProcessDropDnodeMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CREATE_MNODE] = mgmtProcessCreateMnodeMsg;
@ -1516,44 +1432,44 @@ int32_t mgmtCheckRedirectMsgImp(SConnObj *pConn, int32_t msgType) {
} }
int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType) = mgmtCheckRedirectMsgImp; int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType) = mgmtCheckRedirectMsgImp;
int32_t mgmtProcessAlterAcctMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterAcctMsgImp(void *pCont, int32_t contLen, void *ahandle) {
//return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_ALTER_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessAlterAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessAlterAcctMsgImp; int32_t (*mgmtProcessAlterAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessAlterAcctMsgImp;
int32_t mgmtProcessCreateDnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateDnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) {
//return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_DNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_DNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessCreateDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessCreateDnodeMsgImp; int32_t (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessCreateDnodeMsgImp;
int32_t mgmtProcessCfgMnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCfgMnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) {
//return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CFG_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CFG_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessCfgMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessCfgMnodeMsgImp; int32_t (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessCfgMnodeMsgImp;
int32_t mgmtProcessDropMnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessDropMnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) {
//return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessDropMnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessDropMnodeMsgImp; int32_t (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessDropMnodeMsgImp;
int32_t mgmtProcessDropDnodeMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessDropDnodeMsgImp(void *pCont, int32_t contLen, void *ahandle) {
//return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_DNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); //return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_DNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessDropDnodeMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessDropDnodeMsgImp; int32_t (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessDropDnodeMsgImp;
int32_t mgmtProcessDropAcctMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessDropAcctMsgImp(void *pCont, int32_t contLen, void *ahandle) {
// return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); // return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_DROP_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessDropAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessDropAcctMsgImp; int32_t (*mgmtProcessDropAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessDropAcctMsgImp;
int32_t mgmtProcessCreateAcctMsgImp(char *pMsg, int32_t msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateAcctMsgImp(void *pCont, int32_t contLen, void *ahandle) {
// return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT); // return taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_ACCT_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int32_t (*mgmtProcessCreateAcctMsg)(char *pMsg, int32_t msgLen, SConnObj *pConn) = mgmtProcessCreateAcctMsgImp; int32_t (*mgmtProcessCreateAcctMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessCreateAcctMsgImp;

View File

@ -24,8 +24,9 @@
extern char version[]; extern char version[];
const int16_t sdbFileVersion = 0; const int16_t sdbFileVersion = 0;
int sdbExtConns = 0; int sdbExtConns = 0;
SIpList *pSdbIpList = NULL; SRpcIpSet *pSdbIpList = NULL;
SIpList *pSdbPublicIpList = NULL; SRpcIpSet *pSdbPublicIpList = NULL;
SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self
#ifdef CLUSTER #ifdef CLUSTER
int sdbMaster = 0; int sdbMaster = 0;
@ -57,6 +58,13 @@ int64_t sdbGetVersion() {
return sdbVersion; return sdbVersion;
}; };
int32_t sdbGetRunStatus() {
if (sdbInited == NULL) {
return SDB_STATUS_OFFLINE;
}
return sdbStatus;
}
void sdbFinishCommit(void *handle) { void sdbFinishCommit(void *handle) {
SSdbTable *pTable = (SSdbTable *)handle; SSdbTable *pTable = (SSdbTable *)handle;
uint32_t sdbEcommit = SDB_ENDCOMMIT; uint32_t sdbEcommit = SDB_ENDCOMMIT;