feat:[TD-32642] add userApp & userIp for connection support

This commit is contained in:
wangmm0220 2024-11-29 17:59:53 +08:00
parent 5ee80ac3bb
commit d52e9a1c5c
10 changed files with 264 additions and 23 deletions

View File

@ -65,11 +65,12 @@ typedef enum {
} TSDB_OPTION;
typedef enum {
TSDB_OPTION_CONNECTION_CLEAR = -1, // clear all option in this connection
TSDB_OPTION_CONNECTION_CHARSET, // charset, Same as the scope supported by the system
TSDB_OPTION_CONNECTION_TIMEZONE, // timezone, Same as the scope supported by the system
TSDB_OPTION_CONNECTION_USER_IP, // user ip
TSDB_OPTION_CONNECTION_USER_APP, // user app
TSDB_MAX_CONNECTION_OPTIONS = 100
TSDB_MAX_OPTIONS_CONNECTION
} TSDB_OPTION_CONNECTION;
typedef enum {

View File

@ -3421,6 +3421,8 @@ typedef struct {
SQueryHbReqBasic* query;
SHashObj* info; // hash<Skv.key, Skv>
char name[TSDB_APP_NAME_LEN];
char userApp[TSDB_APP_NAME_LEN];
uint32_t userIp;
} SClientHbReq;
typedef struct {

View File

@ -156,8 +156,8 @@ typedef struct {
typedef struct {
timezone_t timezone;
void *charsetCxt;
char app[TSDB_APP_NAME_LEN];
uint32_t ip;
char userApp[TSDB_APP_NAME_LEN];
uint32_t userIp;
}SOptionInfo;
typedef struct STscObj {

View File

@ -964,7 +964,7 @@ void taos_init_imp(void) {
#endif
if (taosCreateLog(logDirName, 10, configDir, NULL, NULL, NULL, NULL, 1) != 0) {
(void)printf(" WARING: Create %s failed:%s. configDir=%s\n", logDirName, strerror(errno), configDir);
tscInitRes = -1;
tscInitRes = terrno;
return;
}
@ -981,7 +981,7 @@ void taos_init_imp(void) {
ENV_ERR_RET(rpcInit(), "failed to init rpc");
if (InitRegexCache() != 0) {
tscInitRes = -1;
tscInitRes = terrno;
(void)printf("failed to init regex cache\n");
return;
}

View File

@ -1194,6 +1194,9 @@ int32_t hbGatherAllInfo(SAppHbMgr *pAppHbMgr, SClientHbBatchReq **pBatchReq) {
continue;
}
tstrncpy(pOneReq->userApp, pTscObj->optionInfo.userApp, sizeof(pOneReq->userApp));
pOneReq->userIp = pTscObj->optionInfo.userIp;
pOneReq = taosArrayPush((*pBatchReq)->reqs, pOneReq);
if (NULL == pOneReq) {
releaseTscObj(connKey->tscRid);

View File

@ -39,6 +39,9 @@ static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SSqlCallbackWrapper *pWrapper);
int taos_options(TSDB_OPTION option, const void *arg, ...) {
if (arg == NULL) {
return TSDB_CODE_INVALID_PARA;
}
static int32_t lock = 0;
for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
@ -117,35 +120,47 @@ static int32_t setConnectionOption(TAOS *taos, TSDB_OPTION_CONNECTION option, co
return TSDB_CODE_INVALID_PARA;
}
if (option != TSDB_MAX_CONNECTION_OPTIONS && (option < TSDB_OPTION_CONNECTION_CHARSET && option > TSDB_OPTION_CONNECTION_USER_APP)){
if (option < TSDB_OPTION_CONNECTION_CLEAR || option >= TSDB_MAX_OPTIONS_CONNECTION){
return TSDB_CODE_INVALID_PARA;
}
int32_t code = taos_init();
// initialize global config
if (code != 0) {
return code;
}
STscObj *pObj = acquireTscObj(*(int64_t *)taos);
if (NULL == pObj) {
tscError("invalid parameter for %s", __func__);
return terrno;
}
int32_t code = 0;
if (option == TSDB_OPTION_CONNECTION_CHARSET) {
if (option == TSDB_OPTION_CONNECTION_CLEAR){
val = NULL;
}
if (option == TSDB_OPTION_CONNECTION_CHARSET || option == TSDB_OPTION_CONNECTION_CLEAR) {
if (val != NULL) {
if (!taosValidateEncodec(val)) {
code = terrno;
goto END;
}
void *tmp = taosConvInit(val);
if (tmp == NULL) {
code = terrno;
goto END;
}
pObj->optionInfo.charsetCxt = tmp;
}else{
val = tsCharset;
pObj->optionInfo.charsetCxt = NULL;
}
void *tmp = taosConvInit(val);
if (tmp == NULL) {
code = terrno;
goto END;
}
pObj->optionInfo.charsetCxt = tmp;
} else if (option == TSDB_OPTION_CONNECTION_TIMEZONE) {
}
if (option == TSDB_OPTION_CONNECTION_TIMEZONE || option == TSDB_OPTION_CONNECTION_CLEAR) {
if (val != NULL){
if (strlen(val) == 0){
tscError("%s empty timezone %s", __func__, val);
code = TSDB_CODE_INVALID_PARA;
goto END;
}
@ -158,17 +173,25 @@ static int32_t setConnectionOption(TAOS *taos, TSDB_OPTION_CONNECTION option, co
} else {
pObj->optionInfo.timezone = NULL;
}
} else if (option == TSDB_OPTION_CONNECTION_USER_APP) {
}
if (option == TSDB_OPTION_CONNECTION_USER_APP || option == TSDB_OPTION_CONNECTION_CLEAR) {
if (val != NULL) {
tstrncpy(pObj->optionInfo.app, val, TSDB_APP_NAME_LEN);
tstrncpy(pObj->optionInfo.userApp, val, sizeof(pObj->optionInfo.userApp));
} else {
pObj->optionInfo.app[0] = 0;
pObj->optionInfo.userApp[0] = 0;
}
} else if (option == TSDB_OPTION_CONNECTION_USER_IP) {
}
if (option == TSDB_OPTION_CONNECTION_USER_IP || option == TSDB_OPTION_CONNECTION_CLEAR) {
if (val != NULL) {
pObj->optionInfo.ip = inet_addr(val);
pObj->optionInfo.userIp = inet_addr(val);
if (pObj->optionInfo.userIp == INADDR_NONE){
code = TSDB_CODE_INVALID_PARA;
goto END;
}
} else {
pObj->optionInfo.ip = 0;
pObj->optionInfo.userIp = INADDR_NONE;
}
}

View File

@ -27,7 +27,7 @@
#include "executor.h"
#include "taos.h"
#include "clientInt.h"
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
@ -165,6 +165,154 @@ void check_set_timezone(TAOS* optionFunc(const char *tz)){
}
}
#define CHECK_TAOS_OPTION_POINTER(taos, option, isnull) \
{ \
STscObj* pObj = acquireTscObj(*(int64_t*)taos); \
ASSERT(pObj != nullptr); \
if (isnull) { \
ASSERT(pObj->optionInfo.option == nullptr); \
} else { \
ASSERT(pObj->optionInfo.option != nullptr); \
} \
}
#define CHECK_TAOS_OPTION_APP(taos, option, val) \
{ \
STscObj* pObj = acquireTscObj(*(int64_t*)taos); \
ASSERT(pObj != nullptr); \
ASSERT(strcmp(pObj->optionInfo.option, val) == 0); \
}
#define CHECK_TAOS_OPTION_IP_ERROR(taos, option, val) \
{ \
STscObj* pObj = acquireTscObj(*(int64_t*)taos); \
ASSERT(pObj != nullptr); \
ASSERT(pObj->optionInfo.option == val); \
}
#define CHECK_TAOS_OPTION_IP(taos, option, val) \
{ \
STscObj* pObj = acquireTscObj(*(int64_t*)taos); \
ASSERT(pObj != nullptr); \
char ip[TD_IP_LEN] = {0}; \
tinet_ntoa(ip, pObj->optionInfo.option); \
ASSERT(strcmp(ip, val) == 0); \
}
TEST(timezoneCase, setConnectionOption_Test) {
int32_t code = taos_options_connection(NULL, TSDB_OPTION_CONNECTION_CHARSET, NULL);
ASSERT(code != 0);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT(pConn != nullptr);
code = taos_options_connection(pConn, TSDB_MAX_OPTIONS_CONNECTION, NULL);
ASSERT(code != 0);
// test charset
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_CHARSET, "");
ASSERT(code != 0);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_CHARSET, NULL);
ASSERT(code == 0);
CHECK_TAOS_OPTION_POINTER(pConn, charsetCxt, true);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_CHARSET, "Asia/Shanghai");
ASSERT(code != 0);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_CHARSET, "gbk");
ASSERT(code == 0);
CHECK_TAOS_OPTION_POINTER(pConn, charsetCxt, false);
// test timezone
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_TIMEZONE, "");
ASSERT(code != 0);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_TIMEZONE, NULL);
ASSERT(code == 0);
CHECK_TAOS_OPTION_POINTER(pConn, timezone, true);
check_sql_result(pConn, "select timezone()", "Asia/Shanghai (CST, +0800)");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_TIMEZONE, "UTC");
ASSERT(code == 0);
CHECK_TAOS_OPTION_POINTER(pConn, timezone, false);
check_sql_result(pConn, "select timezone()", "UTC (UTC, +0000)");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_TIMEZONE, "Asia/Kolkata");
ASSERT(code == 0);
CHECK_TAOS_OPTION_POINTER(pConn, timezone, false);
check_sql_result(pConn, "select timezone()", "Asia/Kolkata (IST, +0530)");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_TIMEZONE, "adbc");
ASSERT(code != 0);
// test user APP
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_APP, "");
ASSERT(code == 0);
CHECK_TAOS_OPTION_APP(pConn, userApp, "");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_APP, NULL);
ASSERT(code == 0);
CHECK_TAOS_OPTION_APP(pConn, userApp, "");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_APP, "aaaaaaaaaaaaaaaaaaaaaabbbbbbb");
ASSERT(code == 0);
CHECK_TAOS_OPTION_APP(pConn, userApp, "aaaaaaaaaaaaaaaaaaaaaab");
// test user IP
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_IP, "");
ASSERT(code != 0);
CHECK_TAOS_OPTION_IP_ERROR(pConn, userIp, INADDR_NONE);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_IP, NULL);
ASSERT(code == 0);
CHECK_TAOS_OPTION_IP_ERROR(pConn, userIp, INADDR_NONE);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_IP, "aaaaaaaaaaaaaaaaaaaaaabbbbbbb");
ASSERT(code != 0);
CHECK_TAOS_OPTION_IP_ERROR(pConn, userIp, INADDR_NONE);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_IP, "1292.168.0.2");
ASSERT(code != 0);
CHECK_TAOS_OPTION_IP_ERROR(pConn, userIp, INADDR_NONE);
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_IP, "192.168.0.2");
ASSERT(code == 0);
CHECK_TAOS_OPTION_IP(pConn, userIp, "192.168.0.2");
taosMsleep(2 * HEARTBEAT_INTERVAL);
//test user APP and user IP
check_sql_result(pConn, "select user_app from performance_schema.perf_connections", "aaaaaaaaaaaaaaaaaaaaaab");
check_sql_result(pConn, "select user_ip from performance_schema.perf_connections", "192.168.0.2");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_IP, "192.168.1.2");
ASSERT(code == 0);
CHECK_TAOS_OPTION_IP(pConn, userIp, "192.168.1.2");
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_USER_APP, "user");
ASSERT(code == 0);
CHECK_TAOS_OPTION_APP(pConn, userApp, "user");
taosMsleep(2 * HEARTBEAT_INTERVAL);
check_sql_result(pConn, "select user_app from performance_schema.perf_connections", "user");
check_sql_result(pConn, "select user_ip from performance_schema.perf_connections", "192.168.1.2");
// test clear
code = taos_options_connection(pConn, TSDB_OPTION_CONNECTION_CLEAR, "192.168.0.2");
ASSERT(code == 0);
CHECK_TAOS_OPTION_POINTER(pConn, charsetCxt, true);
CHECK_TAOS_OPTION_POINTER(pConn, timezone, true);
check_sql_result(pConn, "select timezone()", "Asia/Shanghai (CST, +0800)");
CHECK_TAOS_OPTION_APP(pConn, userApp, "");
CHECK_TAOS_OPTION_IP_ERROR(pConn, userIp, INADDR_NONE);
taos_close(pConn);
}
TEST(timezoneCase, set_timezone_Test) {
check_set_timezone(getConnWithGlobalOption);
check_set_timezone(getConnWithOption);

View File

@ -302,6 +302,8 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
TAOS_CHECK_RETURN(tEncodeSKv(pEncoder, kv));
pIter = taosHashIterate(pReq->info, pIter);
}
TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pReq->userIp));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pReq->userApp));
return 0;
}
@ -399,6 +401,10 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
return terrno = code;
}
}
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pReq->userIp));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pReq->userApp));
}
return 0;
}

View File

@ -501,6 +501,8 @@ static const SSysDbTableSchema connectionsSchema[] = {
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "login_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "last_access", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "user_app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false},
{.name = "user_ip", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema consumerSchema[] = {
@ -542,6 +544,8 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "sub_num", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "sub_status", .bytes = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "user_app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "user_ip", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema appSchema[] = {

View File

@ -45,6 +45,8 @@ typedef struct {
int32_t numOfQueries;
SRWLatch queryLock;
SArray *pQueries; // SArray<SQueryDesc>
char userApp[TSDB_APP_NAME_LEN];
uint32_t userIp;
} SConnObj;
typedef struct {
@ -135,6 +137,13 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
static void setUserInfo2Conn(SConnObj* connObj, char* userApp, uint32_t userIp){
if (connObj == NULL){
return;
}
tstrncpy(connObj->userApp, userApp, sizeof(connObj->userApp));
connObj->userIp = userIp;
}
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
@ -513,6 +522,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
}
setUserInfo2Conn(pConn, pHbReq->userApp, pHbReq->userIp);
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
if (rspBasic == NULL) {
mndReleaseConn(pMnode, pConn, true);
@ -921,6 +931,27 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
return code;
}
char userApp[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(userApp, pConn->userApp);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)userApp, false);
if (code != 0) {
mError("failed to set user app since %s", tstrerror(code));
return code;
}
char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE){
tinet_ntoa(varDataVal(userIp), pConn->userIp);
varDataLen(userIp) = strlen(varDataVal(userIp));
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, numOfRows, (const char *)userIp, false);
if (code != 0) {
mError("failed to set user ip since %s", tstrerror(code));
return code;
}
numOfRows++;
}
@ -1093,6 +1124,29 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
return code;
}
char userApp[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(userApp, pConn->userApp);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, curRowIndex, (const char *)userApp, false);
if (code != 0) {
mError("failed to set user app since %s", tstrerror(code));
taosRUnLockLatch(&pConn->queryLock);
return code;
}
char userIp[TD_IP_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
if (pConn->userIp != 0 && pConn->userIp != INADDR_NONE){
tinet_ntoa(varDataVal(userIp), pConn->userIp);
varDataLen(userIp) = strlen(varDataVal(userIp));
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, curRowIndex, (const char *)userIp, false);
if (code != 0) {
mError("failed to set user ip since %s", tstrerror(code));
taosRUnLockLatch(&pConn->queryLock);
return code;
}
pBlock->info.rows++;
}