Merge pull request #1249 from taosdata/refact/rpc

add destIp support for NAT
This commit is contained in:
slguan 2020-02-21 20:59:16 +08:00 committed by GitHub
commit ce0f8aae66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 187 additions and 193 deletions

View File

@ -254,6 +254,8 @@ typedef struct _user_obj {
char reserved[16]; char reserved[16];
char updateEnd[1]; char updateEnd[1];
struct _user_obj *prev, *next; struct _user_obj *prev, *next;
int8_t writeAuth;
int8_t superAuth;
} SUserObj; } SUserObj;
typedef struct { typedef struct {

View File

@ -187,6 +187,13 @@ typedef enum {
extern char *taosMsg[]; extern char *taosMsg[];
#define TSDB_MSG_DEF_MAX_MPEERS 5
#define TSDB_MSG_DEF_VERSION_LEN 64
#define TSDB_MSG_DEF_DB_LEN 128
#define TSDB_MSG_DEF_USER_LEN 128
#define TSDB_MSG_DEF_TABLE_LEN 128
#define TSDB_MSG_DEF_ACCT_LEN 128
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct { typedef struct {
@ -325,9 +332,23 @@ typedef struct {
} SAlterTableMsg; } SAlterTableMsg;
typedef struct { typedef struct {
char clientVersion[TSDB_VERSION_LEN]; char clientVersion[TSDB_MSG_DEF_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN]; char msgVersion[TSDB_MSG_DEF_VERSION_LEN];
} SConnectMsg; char db[TSDB_MSG_DEF_DB_LEN];
} SCMConnectMsg;
typedef struct {
char acctId[TSDB_MSG_DEF_ACCT_LEN];
char serverVersion[TSDB_MSG_DEF_VERSION_LEN];
int8_t writeAuth;
int8_t superAuth;
int8_t usePublicIp;
int16_t index;
int16_t numOfIps;
uint16_t port;
uint32_t ip[TSDB_MSG_DEF_MAX_MPEERS];
} SCMConnectRsp;
typedef struct { typedef struct {
int32_t maxUsers; int32_t maxUsers;
@ -360,13 +381,6 @@ typedef struct {
char db[TSDB_TABLE_ID_LEN]; char db[TSDB_TABLE_ID_LEN];
} SMgmtHead; } SMgmtHead;
typedef struct {
char acctId[TSDB_ACCT_LEN];
char version[TSDB_VERSION_LEN];
char writeAuth;
char superAuth;
} SConnectRsp;
typedef struct { typedef struct {
short vnode; short vnode;
int32_t sid; int32_t sid;

View File

@ -46,8 +46,9 @@ typedef struct {
} SRpcIpSet; } SRpcIpSet;
typedef struct { typedef struct {
uint32_t sourceIp; uint32_t clientIp;
uint16_t sourcePort; uint16_t clientPort;
uint32_t serverIp;
char *user; char *user;
} SRpcConnInfo; } SRpcConnInfo;

View File

@ -24,7 +24,7 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include "mnode.h" #include "mnode.h"
int mgmtInitShell(); int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType); extern int32_t (*mgmtCheckRedirectMsg)(SConnObj *pConn, int32_t msgType);

View File

@ -46,32 +46,32 @@ static void mgmtInitShowMsgFp();
void * tsShellConn = NULL; void * tsShellConn = NULL;
SConnObj *connList; SConnObj *connList;
void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code); void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code);
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
int (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int, SConnObj *); int32_t (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(char *, int32_t, SConnObj *);
void mgmtInitProcessShellMsg(); void mgmtInitProcessShellMsg();
int mgmtRedirectMsg(SConnObj *pConn, int msgType); int32_t mgmtRedirectMsg(SConnObj *pConn, int32_t msgType);
int mgmtKillQuery(char *queryId, SConnObj *pConn); int32_t mgmtKillQuery(char *queryId, SConnObj *pConn);
void mgmtProcessTranRequest(SSchedMsg *pSchedMsg) { void mgmtProcessTranRequest(SSchedMsg *pSchedMsg) {
SIntMsg * pMsg = (SIntMsg *)(pSchedMsg->msg); SIntMsg * pMsg = (SIntMsg *)(pSchedMsg->msg);
SConnObj *pConn = (SConnObj *)(pSchedMsg->thandle); SConnObj *pConn = (SConnObj *)(pSchedMsg->thandle);
char *cont = (char *)pMsg->content + sizeof(SMgmtHead); char *cont = (char *)pMsg->content + sizeof(SMgmtHead);
int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); int32_t contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
if (pConn->pAcct) (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn); if (pConn->pAcct) (*mgmtProcessShellMsg[pMsg->msgType])(cont, contLen, pConn);
if (pSchedMsg->msg) free(pSchedMsg->msg); if (pSchedMsg->msg) free(pSchedMsg->msg);
} }
int mgmtInitShell() { int32_t mgmtInitShell() {
SRpcInit rpcInit; SRpcInit rpcInit;
mgmtInitProcessShellMsg(); mgmtInitProcessShellMsg();
mgmtInitShowMsgFp(); mgmtInitShowMsgFp();
int size = sizeof(SConnObj) * tsMaxShellConns; int32_t size = sizeof(SConnObj) * tsMaxShellConns;
connList = (SConnObj *)malloc(size); connList = (SConnObj *)malloc(size);
if (connList == NULL) { if (connList == NULL) {
mError("failed to malloc for connList to shell"); mError("failed to malloc for connList to shell");
@ -79,7 +79,7 @@ int mgmtInitShell() {
} }
memset(connList, 0, size); memset(connList, 0, size);
int numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0; int32_t numOfThreads = tsNumOfCores * tsNumOfThreadsPerCore / 4.0;
if (numOfThreads < 1) numOfThreads = 1; if (numOfThreads < 1) numOfThreads = 1;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
@ -113,7 +113,7 @@ void mgmtCleanUpShell() {
static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32_t numOfCols) { static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32_t numOfCols) {
SSchema *pMeterSchema = (SSchema *)(pMeterObj->schema); SSchema *pMeterSchema = (SSchema *)(pMeterObj->schema);
for (int i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSchema->type = pMeterSchema[i].type; pSchema->type = pMeterSchema[i].type;
strcpy(pSchema->name, pMeterSchema[i].name); strcpy(pSchema->name, pMeterSchema[i].name);
pSchema->bytes = htons(pMeterSchema[i].bytes); pSchema->bytes = htons(pMeterSchema[i].bytes);
@ -180,7 +180,7 @@ bool mgmtCheckMeterMetaMsgType(char *pMsg) {
return 0; return 0;
} }
int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg; // SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
// STabObj * pMeterObj = NULL; // STabObj * pMeterObj = NULL;
// SVgObj * pVgroup = NULL; // SVgObj * pVgroup = NULL;
@ -191,7 +191,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// //
// pInfo->createFlag = htons(pInfo->createFlag); // pInfo->createFlag = htons(pInfo->createFlag);
// //
// int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS + // int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
// sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE; // sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE;
// //
// SDbObj *pDb = NULL; // SDbObj *pDb = NULL;
@ -320,7 +320,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// pRsp->code = TSDB_CODE_INVALID_TABLE; // pRsp->code = TSDB_CODE_INVALID_TABLE;
// goto _exit_code; // goto _exit_code;
// } // }
// for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { // for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
// if (pConn->usePublicIp) { // if (pConn->usePublicIp) {
// pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; // pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
// pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); // pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
@ -352,7 +352,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
* | | | * | | |
* pStart pCurMeter pTail * pStart pCurMeter pTail
**/ **/
int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessMultiMeterMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SDbObj * pDbObj = NULL; // SDbObj * pDbObj = NULL;
// STabObj * pMeterObj = NULL; // STabObj * pMeterObj = NULL;
// SVgObj * pVgroup = NULL; // SVgObj * pVgroup = NULL;
@ -365,7 +365,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// char * str = pMsg + sizeof(SMultiMeterInfoMsg); // char * str = pMsg + sizeof(SMultiMeterInfoMsg);
// pInfo->numOfMeters = htonl(pInfo->numOfMeters); // pInfo->numOfMeters = htonl(pInfo->numOfMeters);
// //
// int size = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice // int32_t size = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
// //
// char *pNewMsg; // char *pNewMsg;
// if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) { // if ((pStart = mgmtForMultiAllocMsg(pConn, size, &pNewMsg, &pRsp)) == NULL) {
@ -465,7 +465,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// goto _error_exit_code; // goto _error_exit_code;
// } // }
// //
// for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { // for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
// if (pConn->usePublicIp) { // if (pConn->usePublicIp) {
// pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; // pMeta->meta.vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp;
// pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode); // pMeta->meta.vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
@ -507,7 +507,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessMetricMetaMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg; // SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg;
// STabObj * pMetric; // STabObj * pMetric;
// STaosRsp * pRsp; // STaosRsp * pRsp;
@ -558,9 +558,9 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessCreateDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SCreateDbMsg *pCreate = (SCreateDbMsg *)pMsg; // SCreateDbMsg *pCreate = (SCreateDbMsg *)pMsg;
// int code = 0; // int32_t code = 0;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_DB_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_DB_RSP) != 0) {
// return 0; // return 0;
@ -593,14 +593,14 @@ int mgmtProcessCreateDbMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessCreateMnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateMnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// return rpcSendResponse(pConn->thandle, TSDB_MSG_TYPE_CREATE_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT); // return rpcSendResponse(pConn->thandle, TSDB_MSG_TYPE_CREATE_MNODE_RSP, TSDB_CODE_OPS_NOT_SUPPORT);
return 0; return 0;
} }
int mgmtProcessAlterDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SAlterDbMsg *pAlter = (SAlterDbMsg *)pMsg; // SAlterDbMsg *pAlter = (SAlterDbMsg *)pMsg;
// int code = 0; // int32_t code = 0;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_DB_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_DB_RSP) != 0) {
// return 0; // return 0;
@ -624,8 +624,8 @@ int mgmtProcessAlterDbMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessKillQueryMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessKillQueryMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// int code = 0; // int32_t code = 0;
// SKillQuery *pKill = (SKillQuery *)pMsg; // SKillQuery *pKill = (SKillQuery *)pMsg;
// //
// if (!pConn->writeAuth) { // if (!pConn->writeAuth) {
@ -639,8 +639,8 @@ int mgmtProcessKillQueryMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessKillStreamMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessKillStreamMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// int code = 0; // int32_t code = 0;
// SKillStream *pKill = (SKillStream *)pMsg; // SKillStream *pKill = (SKillStream *)pMsg;
// //
// if (!pConn->writeAuth) { // if (!pConn->writeAuth) {
@ -654,8 +654,8 @@ int mgmtProcessKillStreamMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessKillConnectionMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessKillConnectionMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// int code = 0; // int32_t code = 0;
// SKillConnection *pKill = (SKillConnection *)pMsg; // SKillConnection *pKill = (SKillConnection *)pMsg;
// //
// if (!pConn->superAuth) { // if (!pConn->superAuth) {
@ -669,9 +669,9 @@ int mgmtProcessKillConnectionMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessCreateUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SCreateUserMsg *pCreate = (SCreateUserMsg *)pMsg; // SCreateUserMsg *pCreate = (SCreateUserMsg *)pMsg;
// int code = 0; // int32_t code = 0;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_USER_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_USER_RSP) != 0) {
// return 0; // return 0;
@ -691,9 +691,9 @@ int mgmtProcessCreateUserMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessAlterUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SAlterUserMsg *pAlter = (SAlterUserMsg *)pMsg; // SAlterUserMsg *pAlter = (SAlterUserMsg *)pMsg;
// int code = 0; // int32_t code = 0;
// SUserObj * pUser; // SUserObj * pUser;
// SUserObj * pOperUser; // SUserObj * pOperUser;
// //
@ -803,9 +803,9 @@ int mgmtProcessAlterUserMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessDropUserMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessDropUserMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SDropUserMsg *pDrop = (SDropUserMsg *)pMsg; // SDropUserMsg *pDrop = (SDropUserMsg *)pMsg;
// int code = 0; // int32_t code = 0;
// SUserObj * pUser; // SUserObj * pUser;
// SUserObj * pOperUser; // SUserObj * pOperUser;
// //
@ -862,9 +862,9 @@ int mgmtProcessDropUserMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessDropDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessDropDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SDropDbMsg *pDrop = (SDropDbMsg *)pMsg; // SDropDbMsg *pDrop = (SDropDbMsg *)pMsg;
// int code; // int32_t code;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_DB_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_DB_RSP) != 0) {
// return 0; // return 0;
@ -883,9 +883,9 @@ int mgmtProcessDropDbMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessUseDbMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessUseDbMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SUseDbMsg *pUse = (SUseDbMsg *)pMsg; // SUseDbMsg *pUse = (SUseDbMsg *)pMsg;
// int code; // int32_t code;
// //
// code = mgmtUseDb(pConn, pUse->db); // code = mgmtUseDb(pConn, pUse->db);
// if (code == 0) mTrace("DB is change to:%s by %s", pUse->db, pConn->pUser->user); // if (code == 0) mTrace("DB is change to:%s by %s", pUse->db, pConn->pUser->user);
@ -933,11 +933,11 @@ static void mgmtInitShowMsgFp() {
mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes; mgmtRetrieveFp[TSDB_MGMT_TABLE_VNODES] = mgmtRetrieveVnodes;
} }
int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessShowMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SShowMsg * pShowMsg = (SShowMsg *)pMsg; // SShowMsg * pShowMsg = (SShowMsg *)pMsg;
// STaosRsp * pRsp; // STaosRsp * pRsp;
// char * pStart; // char * pStart;
// int code = 0; // int32_t code = 0;
// SShowRspMsg *pShowRsp; // SShowRspMsg *pShowRsp;
// SShowObj * pShow = NULL; // SShowObj * pShow = NULL;
// //
@ -947,7 +947,7 @@ int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// } // }
// } // }
// //
// int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SShowRspMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS + // int32_t size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SShowRspMsg) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
// TSDB_EXTRA_PAYLOAD_SIZE; // TSDB_EXTRA_PAYLOAD_SIZE;
// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, size); // pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_SHOW_RSP, size);
// if (pStart == NULL) { // if (pStart == NULL) {
@ -992,12 +992,12 @@ int mgmtProcessShowMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessRetrieveMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SRetrieveMeterMsg *pRetrieve; // SRetrieveMeterMsg *pRetrieve;
// SRetrieveMeterRsp *pRsp; // SRetrieveMeterRsp *pRsp;
// int rowsToRead = 0, size = 0, rowsRead = 0; // int32_t rowsToRead = 0, size = 0, rowsRead = 0;
// char * pStart; // char * pStart;
// int code = 0; // int32_t code = 0;
// SShowObj * pShow; // SShowObj * pShow;
// //
// pRetrieve = (SRetrieveMeterMsg *)pMsg; // pRetrieve = (SRetrieveMeterMsg *)pMsg;
@ -1080,9 +1080,9 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessCreateTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SCreateTableMsg *pCreate = (SCreateTableMsg *)pMsg; // SCreateTableMsg *pCreate = (SCreateTableMsg *)pMsg;
// int code; // int32_t code;
// SSchema * pSchema; // SSchema * pSchema;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_TABLE_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CREATE_TABLE_RSP) != 0) {
@ -1097,7 +1097,7 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// //
// pCreate->sqlLen = htons(pCreate->sqlLen); // pCreate->sqlLen = htons(pCreate->sqlLen);
// pSchema = pCreate->schema; // pSchema = pCreate->schema;
// for (int i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) { // for (int32_t i = 0; i < pCreate->numOfColumns + pCreate->numOfTags; ++i) {
// pSchema->bytes = htons(pSchema->bytes); // pSchema->bytes = htons(pSchema->bytes);
// pSchema->colId = i; // pSchema->colId = i;
// pSchema++; // pSchema++;
@ -1135,9 +1135,9 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessDropTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SDropTableMsg *pDrop = (SDropTableMsg *)pMsg; // SDropTableMsg *pDrop = (SDropTableMsg *)pMsg;
// int code; // int32_t code;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_TABLE_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_DROP_TABLE_RSP) != 0) {
// return 0; // return 0;
@ -1161,9 +1161,9 @@ int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessAlterTableMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// SAlterTableMsg *pAlter = (SAlterTableMsg *)pMsg; // SAlterTableMsg *pAlter = (SAlterTableMsg *)pMsg;
// int code; // int32_t code;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_TABLE_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_ALTER_TABLE_RSP) != 0) {
// return 0; // return 0;
@ -1202,8 +1202,8 @@ int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
return 0; return 0;
} }
int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessCfgDnodeMsg(char *pMsg, int32_t msgLen, SConnObj *pConn) {
// int code = 0; // int32_t code = 0;
// SCfgMsg *pCfg = (SCfgMsg *)pMsg; // SCfgMsg *pCfg = (SCfgMsg *)pMsg;
// //
// if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CFG_MNODE_RSP) != 0) { // if (mgmtCheckRedirectMsg(pConn, TSDB_MSG_TYPE_CFG_MNODE_RSP) != 0) {
@ -1223,9 +1223,9 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// return 0; // return 0;
//} //}
// //
//int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) { //int32_t mgmtProcessHeartBeatMsg(char *cont, int32_t contLen, SConnObj *pConn) {
// char * pStart, *pMsg; // char * pStart, *pMsg;
// int msgLen; // int32_t msgLen;
// STaosRsp *pRsp; // STaosRsp *pRsp;
// //
// mgmtSaveQueryStreamList(cont, contLen, pConn); // mgmtSaveQueryStreamList(cont, contLen, pConn);
@ -1247,7 +1247,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// //
// if (pConn->usePublicIp) { // if (pConn->usePublicIp) {
// if (pSdbPublicIpList != NULL) { // if (pSdbPublicIpList != NULL) {
// int size = pSdbPublicIpList->numOfIps * 4; // int32_t size = pSdbPublicIpList->numOfIps * 4;
// pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps; // pHBRsp->ipList.numOfIps = pSdbPublicIpList->numOfIps;
// memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size); // memcpy(pHBRsp->ipList.ip, pSdbPublicIpList->ip, size);
// pMsg += sizeof(SHeartBeatRsp) + size; // pMsg += sizeof(SHeartBeatRsp) + size;
@ -1258,7 +1258,7 @@ int mgmtProcessCfgDnodeMsg(char *pMsg, int msgLen, SConnObj *pConn) {
// //
// } else { // } else {
// if (pSdbIpList != NULL) { // if (pSdbIpList != NULL) {
// int size = pSdbIpList->numOfIps * 4; // int32_t size = pSdbIpList->numOfIps * 4;
// pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps; // pHBRsp->ipList.numOfIps = pSdbIpList->numOfIps;
// memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size); // memcpy(pHBRsp->ipList.ip, pSdbIpList->ip, size);
// pMsg += sizeof(SHeartBeatRsp) + size; // pMsg += sizeof(SHeartBeatRsp) + size;
@ -1296,132 +1296,102 @@ void mgmtEstablishConn(SConnObj *pConn) {
// mgmtAddConnIntoAcct(pConn); // mgmtAddConnIntoAcct(pConn);
} }
int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SUserObj *pUser = NULL; SUserObj *pUser = mgmtGetUser(user);
if (pUser == NULL) {
*spi = 0; *spi = 0;
*encrypt = 0; *encrypt = 0;
secret[0] = 0; *ckey = 0;
ckey[0] = 0; *secret = 0;
return TSDB_CODE_INVALID_USER;
pUser = mgmtGetUser(user); }
if (pUser == NULL) return TSDB_CODE_INVALID_USER;
*spi = 1; *spi = 1;
*encrypt = 0; *encrypt = 0;
*ckey = 0;
memcpy(secret, pUser->pass, TSDB_KEY_LEN); memcpy(secret, pUser->pass, TSDB_KEY_LEN);
return TSDB_CODE_SUCCESS;
return 0;
} }
int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) { int32_t mgmtProcessConnectMsg(int8_t type, void *pCont, int32_t contLen, void *ahandle, int32_t code) {
// STaosRsp * pRsp; SCMConnectMsg *pConnectMsg = (SCMConnectMsg *) pCont;
// SConnectRsp *pConnectRsp; uint32_t destIp = 0;
// SConnectMsg *pConnectMsg; uint32_t srcIp = 0;
// char * pStart;
// int code = TSDB_CODE_INVALID_USER; SUserObj *pUser = mgmtGetUser(pConnectMsg->head.userId);
// SAcctObj * pAcct = NULL; if (pUser == NULL) {
// SUserObj * pUser = NULL; mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// SDbObj * pDb = NULL; rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
// char dbName[256] = {0}; return TSDB_CODE_INVALID_USER;
// }
// pConnectMsg = (SConnectMsg *)pMsg;
// if (mgmtCheckExpired()) {
// pUser = mgmtGetUser(pConn->user); mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// if (pUser == NULL) { rpcSendResponse(ahandle, TSDB_CODE_GRANT_EXPIRED, NULL, 0);
// code = TSDB_CODE_INVALID_USER; return TSDB_CODE_GRANT_EXPIRED;
// goto _rsp; }
// }
// SAcctObj *pAcct = mgmtGetAcct(pUser->acct);
// if (mgmtCheckExpired()) { if (pAcct == NULL) {
// code = TSDB_CODE_GRANT_EXPIRED; mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// goto _rsp; rpcSendResponse(ahandle, TSDB_CODE_INVALID_ACCT, NULL, 0);
// } return TSDB_CODE_INVALID_ACCT;
// }
// pAcct = mgmtGetAcct(pUser->acct);
// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3);
// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); if (code != TSDB_CODE_SUCCESS) {
// if (code != 0) { mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// mError("invalid client version:%s", pConnectMsg->clientVersion); rpcSendResponse(ahandle, code, NULL, 0);
// goto _rsp; return code;
// } }
//
// if (pConnectMsg->db[0]) { if (pConnectMsg->db[0]) {
// sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); char dbName[TSDB_TABLE_ID_LEN] = {0};
// pDb = mgmtGetDb(dbName); sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
// if (pDb == NULL) { SDbObj *pDb = mgmtGetDb(dbName);
// code = TSDB_CODE_INVALID_DB; if (pDb == NULL) {
// goto _rsp; mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// } rpcSendResponse(ahandle, TSDB_CODE_INVALID_DB, NULL, 0);
// } return TSDB_CODE_INVALID_DB;
// }
// if (pConn->pAcct) { }
// mgmtRemoveConnFromAcct(pConn);
// atomic_fetch_sub_32(&mgmtShellConns, 1); SCMConnectRsp *pConnectRsp = rpcMallocCont(sizeof(SCMConnectRsp));
// atomic_fetch_sub_32(&sdbExtConns, 1); if (pConnectRsp == NULL) {
// } mLError("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
// code = 0; return TSDB_CODE_SERV_OUT_OF_MEMORY;
// pConn->pAcct = pAcct; }
// pConn->pDb = pDb;
// pConn->pUser = pUser; sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
// mgmtEstablishConn(pConn); strcpy(pConnectRsp->serverVersion, version);
// pConnectRsp->writeAuth = pConn->writeAuth;
//_rsp: pConnectRsp->superAuth = pConn->superAuth;
// pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_CONNECT_RSP, 128);
// if (pStart == NULL) return 0; pConnectRsp->index = 0;
// pConnectRsp->usePublicIp = (destIp == tsPublicIpInt ? 1 : 0);
// pMsg = pStart; if (pSdbPublicIpList != NULL && pSdbIpList != NULL) {
// pRsp = (STaosRsp *)pMsg; pConnectRsp->numOfIps = htons(pSdbPublicIpList->numOfIps);
// pRsp->code = code; pConnectRsp->port = htons(tsMgmtShellPort);
// pMsg += sizeof(STaosRsp); if (pConnectRsp->usePublicIp) {
// for (int i = 0; i < pSdbPublicIpList->numOfIps; ++i) {
// if (code == 0) { pConnectRsp->ip[i] = htonl(pSdbPublicIpList->ip[i]);
// pConnectRsp = (SConnectRsp *)pRsp->more; }
// sprintf(pConnectRsp->acctId, "%x", pConn->pAcct->acctId); } else {
// strcpy(pConnectRsp->version, version); for (int i = 0; i < pSdbIpList->numOfIps; ++i) {
// pConnectRsp->writeAuth = pConn->writeAuth; pConnectRsp->ip[i] = htonl(pSdbIpList->ip[i]);
// pConnectRsp->superAuth = pConn->superAuth; }
// pMsg += sizeof(SConnectRsp); }
// } else {
// int size; pConnectRsp->numOfIps = 0;
// if (pSdbPublicIpList != NULL && pSdbIpList != NULL) { pConnectRsp->port = htons(tsMgmtShellPort);
// size = pSdbPublicIpList->numOfIps * 4 + sizeof(SIpList); }
// if (pConn->usePublicIp) {
// memcpy(pMsg, pSdbPublicIpList, size); mLPrint("user:%s login from %s to %s, code:%d", pConnectMsg->head.userId, taosIpStr(srcIp), taosIpStr(destIp), code);
// } else { return TSDB_CODE_SUCCESS;
// memcpy(pMsg, pSdbIpList, size);
// }
// } else {
// SIpList tmpIpList;
// tmpIpList.numOfIps = 0;
// size = tmpIpList.numOfIps * 4 + sizeof(SIpList);
// memcpy(pMsg, &tmpIpList, size);
// }
//
// pMsg += size;
//
// // set the time resolution: millisecond or microsecond
// *((uint32_t *)pMsg) = tsTimePrecision;
// pMsg += sizeof(uint32_t);
//
// } else {
// pConn->pAcct = NULL;
// pConn->pUser = NULL;
// }
//
// msgLen = pMsg - pStart;
// taosSendMsgToPeer(pConn->thandle, pStart, msgLen);
//
// char ipstr[24];
// tinet_ntoa(ipstr, pConn->ip);
// mLPrint("user:%s login from %s, code:%d", pConn->user, ipstr, code);
//
// return code;
return 0;
} }
void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle, int32_t code) { void mgmtProcessMsgFromShell(char type, void *pCont, int32_t contLen, void *ahandle, int32_t code) {
// SIntMsg * pMsg = (SIntMsg *)msg; // SIntMsg * pMsg = (SIntMsg *)msg;
// SConnObj *pConn = (SConnObj *)ahandle; // SConnObj *pConn = (SConnObj *)ahandle;
// //
@ -1472,7 +1442,7 @@ void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *ahandle,
// } // }
// //
// char *cont = (char *)pMsg->content + sizeof(SMgmtHead); // char *cont = (char *)pMsg->content + sizeof(SMgmtHead);
// int contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead); // int32_t contLen = pMsg->msgLen - sizeof(SIntMsg) - sizeof(SMgmtHead);
// //
// // read-only request can be executed concurrently // // read-only request can be executed concurrently
// if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) || // if ((pMsg->msgType == TSDB_MSG_TYPE_TABLE_META && (!mgmtCheckMeterMetaMsgType(cont))) ||

View File

@ -32,6 +32,7 @@ typedef struct {
uint32_t uid; // for unique ID inside a client uint32_t uid; // for unique ID inside a client
uint32_t sourceId; // source ID, an index for connection list uint32_t sourceId; // source ID, an index for connection list
uint32_t destId; // destination ID, an index for connection list uint32_t destId; // destination ID, an index for connection list
uint32_t destIp; // destination IP address, for NAT scenario
char user[TSDB_UNI_LEN]; char user[TSDB_UNI_LEN];
uint16_t port; // for UDP only, port may be changed uint16_t port; // for UDP only, port may be changed
char empty[1]; // reserved char empty[1]; // reserved

View File

@ -94,6 +94,7 @@ typedef struct _RpcConn {
uint16_t localPort; // for UDP only uint16_t localPort; // for UDP only
uint32_t peerUid; // peer UID uint32_t peerUid; // peer UID
uint32_t peerIp; // peer IP uint32_t peerIp; // peer IP
uint32_t destIp; // server destination IP to handle NAT
uint16_t peerPort; // peer port uint16_t peerPort; // peer port
char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string char peerIpstr[TSDB_IPv4ADDR_LEN]; // peer IP string
uint16_t tranId; // outgoing transcation ID, for build message uint16_t tranId; // outgoing transcation ID, for build message
@ -389,8 +390,9 @@ void rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
pInfo->sourceIp = pConn->peerIp; pInfo->clientIp = pConn->peerIp;
pInfo->sourcePort = pConn->peerPort; pInfo->clientPort = pConn->peerPort;
pInfo->serverIp = pConn->destIp;
strcpy(pInfo->user, pConn->user); strcpy(pInfo->user, pConn->user);
} }
@ -546,6 +548,7 @@ SRpcConn *rpcSetConnToServer(SRpcInfo *pRpc, SRpcIpSet ipSet) {
char ipstr[20] = {0}; char ipstr[20] = {0};
tinet_ntoa(ipstr, ipSet.ip[ipSet.index]); tinet_ntoa(ipstr, ipSet.ip[ipSet.index]);
pConn = rpcOpenConn(pRpc, ipstr, ipSet.port); pConn = rpcOpenConn(pRpc, ipstr, ipSet.port);
pConn->destIp = ipSet.ip[ipSet.index];
} }
return pConn; return pConn;
@ -772,11 +775,13 @@ static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
pHead = rpcDecompressRpcMsg(pHead); pHead = rpcDecompressRpcMsg(pHead);
int contLen = rpcContLenFromMsg(pHead->msgLen); int contLen = rpcContLenFromMsg(pHead->msgLen);
uint8_t *pCont = pHead->content; uint8_t *pCont = pHead->content;
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
pConn->destIp = pHead->destIp;
taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer); taosTmrReset(rpcProcessProgressTimer, tsRpcTimer/2, pConn, pRpc->tmrCtrl, &pConn->pTimer);
(*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0); (*(pRpc->cfp))(pHead->msgType, pCont, contLen, pConn, 0);
} else { } else {
@ -886,6 +891,7 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
pHead->tranId = pConn->tranId; pHead->tranId = pConn->tranId;
pHead->sourceId = pConn->ownId; pHead->sourceId = pConn->ownId;
pHead->destId = pConn->peerId; pHead->destId = pConn->peerId;
pHead->destIp = pConn->destIp;
pHead->port = 0; pHead->port = 0;
pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid()); pHead->uid = (uint32_t)((int64_t)pConn + (int64_t)getpid());
memcpy(pHead->user, pConn->user, tListLen(pHead->user)); memcpy(pHead->user, pConn->user, tListLen(pHead->user));