Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

This commit is contained in:
Hongze Cheng 2021-12-14 16:16:28 +08:00
commit f7ee9ced05
71 changed files with 5482 additions and 552 deletions

View File

@ -1,9 +0,0 @@
# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.209.3/containers/cpp/.devcontainer/base.Dockerfile
# [Choice] Debian / Ubuntu version (use Debian 11/9, Ubuntu 18.04/21.04 on local arm64/Apple Silicon): debian-11, debian-10, debian-9, ubuntu-21.04, ubuntu-20.04, ubuntu-18.04
ARG VARIANT="bullseye"
FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
# [Optional] Uncomment this section to install additional packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here>

View File

@ -1,32 +0,0 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at:
// https://github.com/microsoft/vscode-dev-containers/tree/v0.209.3/containers/cpp
{
"name": "C++",
"build": {
"dockerfile": "Dockerfile",
// Update 'VARIANT' to pick an Debian / Ubuntu OS version: debian-11, debian-10, debian-9, ubuntu-21.04, ubuntu-20.04, ubuntu-18.04
// Use Debian 11, Debian 9, Ubuntu 18.04 or Ubuntu 21.04 on local arm64/Apple Silicon
"args": { "VARIANT": "ubuntu-21.04" }
},
"runArgs": ["--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined"],
// Set *default* container specific settings.json values on container create.
"settings": {},
// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"ms-vscode.cpptools",
"ms-vscode.cmake-tools",
"austin.code-gnu-global",
"visualstudioexptteam.vscodeintel"
],
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "gcc -v",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode"
}

View File

@ -73,10 +73,12 @@ typedef struct taosField {
#define DLL_EXPORT
#endif
DLL_EXPORT int taos_init();
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
@ -154,14 +156,14 @@ DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res);
// TAOS_RES *taos_list_dbs(TAOS *mysql, const char *wild);
// TODO: the return value should be `const`
DLL_EXPORT char *taos_get_server_info(TAOS *taos);
DLL_EXPORT char *taos_get_client_info();
DLL_EXPORT char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT const char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, int code), void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, int numOfRows), void *param);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval);

View File

@ -77,7 +77,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_STB, "create-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STB, "alter-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STB, "drop-stb" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STB_VGROUP, "stb-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_VGROUP_LIST, "vgroup-list" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
@ -214,6 +214,11 @@ typedef enum _mgmt_table {
extern char *taosMsg[];
typedef struct SBuildTableMetaInput {
int32_t vgId;
char *tableFullName;
} SBuildTableMetaInput;
#pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
@ -358,6 +363,7 @@ typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
int64_t startTime;
} SConnectMsg;
typedef struct SEpSet {
@ -368,19 +374,17 @@ typedef struct SEpSet {
} SEpSet;
typedef struct {
int32_t acctId;
int32_t clusterId;
int32_t connId;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t reserved[5];
SEpSet epSet;
int32_t acctId;
uint32_t clusterId;
int32_t connId;
int8_t superUser;
int8_t reserved[5];
SEpSet epSet;
} SConnectRsp;
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
int32_t maxUsers;
int32_t maxDbs;
int32_t maxTimeSeries;
@ -395,7 +399,7 @@ typedef struct {
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
} SCreateUserMsg, SAlterUserMsg;
typedef struct {
@ -774,9 +778,8 @@ typedef struct {
} SStbInfoMsg;
typedef struct {
SMsgHead msgHead;
char tableFname[TSDB_TABLE_FNAME_LEN];
int8_t createFlag;
char tags[];
} STableInfoMsg;
typedef struct {
@ -791,6 +794,20 @@ typedef struct SSTableVgroupMsg {
int32_t numOfTables;
} SSTableVgroupMsg, SSTableVgroupRspMsg;
typedef struct SVgroupInfo {
int32_t vgId;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SVgroupInfo;
typedef struct SVgroupListRspMsg {
int32_t vgroupNum;
int32_t vgroupVersion;
SVgroupInfo vgroupInfo[];
} SVgroupListRspMsg;
typedef SVgroupListRspMsg SVgroupListInfo;
typedef struct {
int32_t vgId;
int8_t numOfEps;
@ -961,8 +978,8 @@ typedef struct {
char user[TSDB_USER_LEN];
char spi;
char encrypt;
char secret[TSDB_KEY_LEN];
char ckey[TSDB_KEY_LEN];
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN];
} SAuthMsg, SAuthRsp;
typedef struct {

17
include/common/tep.h Normal file
View File

@ -0,0 +1,17 @@
#ifndef TDENGINE_TEP_H
#define TDENGINE_TEP_H
#include "os.h"
#include "taosmsg.h"
typedef struct SCorEpSet {
int32_t version;
SEpSet epSet;
} SCorEpSet;
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
#endif // TDENGINE_TEP_H

View File

@ -81,8 +81,6 @@ extern int64_t tsMaxRetentWindow;
// db parameters in client
extern int32_t tsCacheBlockSize;
extern int32_t tsBlocksPerVnode;
extern int32_t tsMinTablePerVnode;
extern int32_t tsMaxTablePerVnode;
extern int32_t tsTableIncStepPerVnode;
extern int32_t tsMaxVgroupsPerDb;
extern int16_t tsDaysPerFile;
@ -113,16 +111,8 @@ extern int8_t tsEnableSlaveQuery;
extern int8_t tsEnableAdjustMaster;
// restful
extern int8_t tsEnableHttpModule;
extern int32_t tsRestRowLimit;
extern uint16_t tsHttpPort;
extern int32_t tsHttpCacheSessions;
extern int32_t tsHttpSessionExpire;
extern int32_t tsHttpMaxThreads;
extern int8_t tsHttpEnableCompress;
extern int8_t tsHttpEnableRecordSql;
extern int8_t tsTelegrafUseFieldNum;
extern int8_t tsHttpDbNameMandatory;
// mqtt
extern int8_t tsEnableMqttModule;
@ -144,7 +134,6 @@ extern int8_t tsEnableStream;
// internal
extern int8_t tsPrintAuth;
extern int8_t tscEmbedded;
extern char tsVnodeDir[];
extern char tsMnodeDir[];
extern int64_t tsTickPerDay[3];
@ -193,7 +182,7 @@ extern SDiskCfg tsDiskCfg[];
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
void taosInitGlobalCfg();
int32_t taosCheckGlobalCfg();
int32_t taosCheckAndPrintCfg();
int32_t taosCfgDynamicOptions(char *msg);
bool taosCheckBalanceCfgOptions(const char *option, int32_t *vnodeId, int32_t *dnodeId);
void taosAddDataDir(int index, char *v1, int level, int primary);

32
include/common/tmessage.h Normal file
View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_TMESSAGE_H_
#define _TD_COMMON_TMESSAGE_H_
#ifdef __cplusplus
extern "C" {
#endif
extern int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_TMESSAGE_H_*/

View File

@ -30,16 +30,23 @@ extern "C" {
struct SCatalog;
typedef struct SMetaReq {
char clusterId[TSDB_CLUSTER_ID_LEN];
typedef struct SDBVgroupInfo {
int32_t vgroupVersion;
SArray *vgId;
int32_t hashRange;
int32_t hashNum;
} SDBVgroupInfo;
typedef struct SCatalogReq {
char clusterId[TSDB_CLUSTER_ID_LEN]; //????
SArray *pTableName; // table full name
SArray *pUdf; // udf name
bool qNodeEpset; // valid qnode
} SMetaReq;
bool qNodeRequired; // valid qnode
} SCatalogReq;
typedef struct SMetaData {
SArray *pTableMeta; // tableMeta
SArray *pVgroupInfo; // vgroupInfo list
SArray *pTableMeta; // STableMeta array
SArray *pVgroupInfo; // SVgroupInfo list
SArray *pUdfList; // udf info list
SEpSet *pEpSet; // qnode epset list
} SMetaData;
@ -78,32 +85,71 @@ typedef struct STableMeta {
SSchema schema[];
} STableMeta;
typedef struct SCatalogCfg {
} SCatalogCfg;
int32_t catalogInit(SCatalogCfg *cfg);
/**
* Catalog service object, which is utilized to hold tableMeta (meta/vgroupInfo/udfInfo) at the client-side.
* There is ONLY one SCatalog object for one process space, and this function returns a singleton.
* @param pMgmtEps
* @param clusterId
* @return
*/
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps);
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle);
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version);
int32_t catalogGetVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList);
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup);
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version);
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo);
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo);
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta);
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta);
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const STableMeta* pTableMeta, STableMeta* pNewTableMeta);
/**
* get table's vgroup list.
* @param clusterId
* @pVgroupList - array of SVgroupInfo
* @return
*/
int32_t catalogGetTableVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, SArray* pVgroupList);
/**
* Get the required meta data from mnode.
* Note that this is a synchronized API and is also thread-safety.
* @param pCatalog
* @param pMgmtEps
* @param pMetaReq
* @param pMetaData
* @return
*/
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData);
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
/**
* Destroy catalog service handle
* Destroy catalog and relase all resources
* @param pCatalog
*/
void destroyCatalog(struct SCatalog* pCatalog);
void catalogDestroy(void);
#ifdef __cplusplus
}
#endif
#endif /*_TD_CATALOG_H_*/
#endif /*_TD_CATALOG_H_*/

View File

@ -29,11 +29,6 @@ extern "C" {
extern int tsRpcHeadSize;
typedef struct SRpcCorEpSet {
int32_t version;
SEpSet epSet;
} SRpcCorEpSet;
typedef struct SRpcConnInfo {
uint32_t clientIp;
uint16_t clientPort;

View File

@ -32,6 +32,23 @@ extern int32_t wDebugFlag;
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }}
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
#pragma pack(push,1)
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
@ -43,6 +60,7 @@ typedef struct SWalReadHead {
uint8_t msgType;
int8_t reserved[2];
int32_t len;
//int64_t ingestTs; //not implemented
int64_t version;
char body[];
} SWalReadHead;
@ -71,25 +89,6 @@ typedef struct {
SWalReadHead head;
} SWalHead;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
typedef struct SWalVer {
int64_t firstVer;
int64_t verInSnapshotting;
@ -101,24 +100,18 @@ typedef struct SWalVer {
typedef struct SWal {
// cfg
SWalCfg cfg;
//total size
int64_t totSize;
//fsync seq
int32_t fsyncSeq;
//reference
int64_t refId;
//write tfd
int64_t writeLogTfd;
int64_t writeIdxTfd;
//wal lifecycle
SWalVer vers;
//roll status
int64_t lastRollSeq;
//file set
int32_t writeCur;
int64_t writeLogTfd;
int64_t writeIdxTfd;
SArray* fileInfoSet;
//ctl
int32_t curStatus;
int32_t fsyncSeq;
int64_t totSize;
int64_t refId;
int64_t lastRollSeq;
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
@ -134,8 +127,9 @@ typedef struct SWalReadHandle {
int64_t curVersion;
int64_t capacity;
int64_t status; //if cursor valid
SWalHead head;
SWalHead* pHead;
} SWalReadHandle;
#pragma pack(pop)
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);

View File

@ -57,7 +57,7 @@ int64_t taosGetPthreadId(pthread_t thread);
void taosResetPthread(pthread_t* thread);
bool taosComparePthread(pthread_t first, pthread_t second);
int32_t taosGetPId();
int32_t taosGetCurrentAPPName(char* name, int32_t* len);
int32_t taosGetAppName(char* name, int32_t* len);
#ifdef __cplusplus
}

View File

@ -117,6 +117,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x0221) //"Invalid JSON format")
#define TSDB_CODE_TSC_INVALID_JSON_TYPE TAOS_DEF_ERROR_CODE(0, 0x0222) //"Invalid JSON data type")
#define TSDB_CODE_TSC_VALUE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0223) //"Value out of range")
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300)
@ -501,6 +502,13 @@ int32_t* taosGetErrno();
// monitor
#define TSDB_CODE_MON_CONNECTION_INVALID TAOS_DEF_ERROR_CODE(0, 0x2300) //"monitor invalid monitor db connection")
// catalog
#define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400) //catalog interval error
#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401) //invalid catalog input parameters
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error
#ifdef __cplusplus
}
#endif

View File

@ -20,7 +20,7 @@
extern "C" {
#endif
#define TSDB_CFG_MAX_NUM 123
#define TSDB_CFG_MAX_NUM 115
#define TSDB_CFG_PRINT_LEN 23
#define TSDB_CFG_OPTION_LEN 24
#define TSDB_CFG_VALUE_LEN 41
@ -83,11 +83,11 @@ extern int32_t tsGlobalConfigNum;
extern char * tsCfgStatusStr[];
void taosReadGlobalLogCfg();
int32_t taosReadGlobalCfg();
void taosPrintGlobalCfg();
int32_t taosReadCfgFromFile();
void taosPrintCfg();
void taosDumpGlobalCfg();
void taosInitConfigOption(SGlobalCfg cfg);
void taosAddConfigOption(SGlobalCfg cfg);
SGlobalCfg *taosGetConfigOption(const char *option);
#ifdef __cplusplus

View File

@ -161,7 +161,7 @@ do { \
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33
#define TSDB_DB_NAME_LEN 65
#define TSDB_FULL_DB_NAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN)
#define TSDB_FUNC_NAME_LEN 65
@ -193,7 +193,7 @@ do { \
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_AUTH_LEN 16
#define TSDB_KEY_LEN 16
#define TSDB_PASSWORD_LEN 32
#define TSDB_VERSION_LEN 12
#define TSDB_LABEL_LEN 8

View File

@ -45,6 +45,8 @@ extern int32_t sDebugFlag;
extern int32_t tsdbDebugFlag;
extern int32_t cqDebugFlag;
extern int32_t debugFlag;
extern int32_t ctgDebugFlag;
#define DEBUG_FATAL 1U
#define DEBUG_ERROR DEBUG_FATAL

View File

@ -45,14 +45,25 @@ char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, inBuf, (unsigned int)inLen);
tMD5Final(&context);
memcpy(target, context.digest, TSDB_KEY_LEN);
memcpy(target, context.digest, tListLen(context.digest));
}
static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *target) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, inBuf, (unsigned int)len);
tMD5Final(&context);
sprintf(target, "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x", context.digest[0], context.digest[1], context.digest[2],
context.digest[3], context.digest[4], context.digest[5], context.digest[6], context.digest[7],
context.digest[8], context.digest[9], context.digest[10], context.digest[11], context.digest[12],
context.digest[13], context.digest[14], context.digest[15]);
}
#ifdef __cplusplus

View File

@ -2,9 +2,14 @@ aux_source_directory(src CLIENT_SRC)
add_library(taos ${CLIENT_SRC})
target_include_directories(
taos
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${CMAKE_SOURCE_DIR}/include/client"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
taos
PRIVATE common
INTERFACE api
PRIVATE os util common transport parser
)
ADD_SUBDIRECTORY(test)

View File

@ -0,0 +1,146 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_CLIENTINT_H
#define TDENGINE_CLIENTINT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taos.h"
#include "taosmsg.h"
#include "thash.h"
#include "tlist.h"
#include "trpc.h"
#include "tdef.h"
#include "tmsgtype.h"
#include "tep.h"
typedef struct SQueryExecMetric {
int64_t start; // start timestamp
int64_t parsed; // start to parse
int64_t send; // start to send to server
int64_t rsp; // receive response from server
} SQueryExecMetric;
typedef struct SInstanceActivity {
uint64_t numOfInsertsReq;
uint64_t numOfInsertRows;
uint64_t insertElapsedTime;
uint64_t insertBytes; // submit to tsdb since launched.
uint64_t fetchBytes;
uint64_t queryElapsedTime;
uint64_t numOfSlowQueries;
uint64_t totalRequests;
uint64_t currentRequests; // the number of SRequestObj
} SInstanceActivity;
typedef struct SHeartBeatInfo {
void *pTimer; // timer, used to send request msg to mnode
} SHeartBeatInfo;
typedef struct SAppInstInfo {
int64_t numOfConns;
SCorEpSet mgmtEp;
SInstanceActivity summary;
SList *pConnList; // STscObj linked list
uint32_t clusterId;
void *pTransporter;
} SAppInstInfo;
typedef struct SAppInfo {
int64_t startTime;
char appName[TSDB_APP_NAME_LEN];
char *ep;
int32_t pid;
int32_t numOfThreads;
SHeartBeatInfo hb;
SHashObj *pInstMap;
} SAppInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char acctId[TSDB_ACCT_ID_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
uint32_t connId;
uint64_t id; // ref ID returned by taosAddRef
// struct SSqlObj *sqlList;
void *pTransporter;
pthread_mutex_t mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj from this tscObj
SAppInstInfo *pAppInfo;
} STscObj;
typedef struct SReqBody {
tsem_t rspSem; // not used now
void* fp;
void* param;
} SRequestBody;
typedef struct SRequestObj {
uint64_t requestId;
int32_t type; // request type
STscObj *pTscObj;
SQueryExecMetric metric;
char *sqlstr; // sql string
SRequestBody body;
int64_t self;
char *msgBuf;
int32_t code;
void *pInfo; // sql parse info, generated by parser module
} SRequestObj;
typedef struct SRequestMsgBody {
int32_t msgType;
void *pData;
int32_t msgLen;
uint64_t requestId;
uint64_t requestObjRefId;
} SRequestMsgBody;
extern SAppInfo appInfo;
extern int32_t tscReqRef;
extern void *tscQhandle;
extern int32_t tscConnRef;
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsg);
extern int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);
int taos_init();
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo);
void destroyTscObj(void*pObj);
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type);
void destroyRequest(SRequestObj* pRequest);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port);
void taos_init_imp(void);
int taos_options_imp(TSDB_OPTION option, const char *str);
void* openTransporter(const char *user, const char *auth);
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
void initMsgHandleFp();
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_CLIENTINT_H

View File

@ -0,0 +1,37 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCLOG_H
#define TDENGINE_TSCLOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tlog.h"
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif

View File

@ -13,11 +13,51 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#include "taos.h"
#include "os.h"
#include "tdef.h"
#include "tglobal.h"
#include "clientInt.h"
#include "tscLog.h"
//TAOS_RES *taos_query(TAOS *taos, const char *sql) {
//
//}
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
int32_t p = (port != 0)? port:tsServerPort;
tscDebug("try to connect to %s:%u, user:%s db:%s", ip, p, user, db);
if (user == NULL) {
user = TSDB_DEFAULT_USER;
}
if (pass == NULL) {
pass = TSDB_DEFAULT_PASS;
}
return taos_connect_internal(ip, user, pass, NULL, db, p);
}
TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
if (user == NULL) {
user = TSDB_DEFAULT_USER;
}
if (auth == NULL) {
tscError("No auth info is given, failed to connect to server");
return NULL;
}
return taos_connect_internal(ip, user, NULL, auth, db, port);
}
TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
char ipStr[TSDB_EP_LEN] = {0};
char dbStr[TSDB_DB_NAME_LEN] = {0};
char userStr[TSDB_USER_LEN] = {0};
char passStr[TSDB_PASSWORD_LEN] = {0};
strncpy(ipStr, ip, MIN(TSDB_EP_LEN - 1, ipLen));
strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
strncpy(dbStr, db, MIN(TSDB_DB_NAME_LEN - 1, dbLen));
return taos_connect(ipStr, userStr, passStr, dbStr, port);
}
int taos_init() { return 0; }
void taos_cleanup(void) {}

View File

@ -0,0 +1,283 @@
#include <tpagedfile.h>
#include "clientInt.h"
#include "tdef.h"
#include "tep.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tref.h"
#include "tscLog.h"
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
static void destroyConnectMsg(SRequestMsgBody* pMsgBody);
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) {
return false;
}
size_t len = strlen(str);
if (len <= 0 || len > maxsize) {
return false;
}
return true;
}
static bool validateUserName(const char* user) {
return stringLengthCheck(user, TSDB_USER_LEN - 1);
}
static bool validatePassword(const char* passwd) {
return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
}
static bool validateDbName(const char* db) {
return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
char key[512] = {0};
snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
return strdup(key);
}
static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
if (taos_init() != TSDB_CODE_SUCCESS) {
return NULL;
}
if (!validateUserName(user)) {
terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
return NULL;
}
char tmp[TSDB_DB_NAME_LEN] = {0};
if (db != NULL) {
if(!validateDbName(db)) {
terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
return NULL;
}
tstrncpy(tmp, db, sizeof(tmp));
strdequote(tmp);
}
char secretEncrypt[32] = {0};
if (auth == NULL) {
if (!validatePassword(pass)) {
terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
return NULL;
}
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
} else {
tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
}
SCorEpSet epSet = {0};
if (ip) {
if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
return NULL;
}
if (port) {
epSet.epSet.port[0] = port;
}
} else {
if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
return NULL;
}
}
char* key = getClusterKey(user, secretEncrypt, ip, port);
SAppInstInfo* pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
if (pInst == NULL) {
pInst = calloc(1, sizeof(struct SAppInstInfo));
pInst->mgmtEp = epSet;
pInst->pTransporter = openTransporter(user, secretEncrypt);
taosHashPut(appInfo.pInstMap, key, strlen(key), &pInst, POINTER_BYTES);
}
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, pInst);
}
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
pEpSet->version = 0;
// init mgmt ip set
SEpSet *mgmtEpSet = &(pEpSet->epSet);
mgmtEpSet->numOfEps = 0;
mgmtEpSet->inUse = 0;
if (firstEp && firstEp[0] != 0) {
if (strlen(firstEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
mgmtEpSet->numOfEps++;
}
if (secondEp && secondEp[0] != 0) {
if (strlen(secondEp) >= TSDB_EP_LEN) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
mgmtEpSet->numOfEps++;
}
if (mgmtEpSet->numOfEps == 0) {
terrno = TSDB_CODE_TSC_INVALID_FQDN;
return -1;
}
return 0;
}
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
STscObj *pTscObj = createTscObj(user, auth, ip, port, pAppInfo);
if (NULL == pTscObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return pTscObj;
}
SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_SQL_CONNECT);
if (pRequest == NULL) {
destroyTscObj(pTscObj);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
SRequestMsgBody body = {0};
buildConnectMsg(pRequest, &body);
int64_t transporterId = 0;
sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);
tsem_wait(&pRequest->body.rspSem);
destroyConnectMsg(&body);
if (pRequest->code != TSDB_CODE_SUCCESS) {
const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
printf("failed to connect to server, reason: %s\n\n", errorMsg);
destroyRequest(pRequest);
taos_close(pTscObj);
pTscObj = NULL;
} else {
tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
destroyRequest(pRequest);
}
return pTscObj;
}
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
pMsgBody->msgType = TSDB_MSG_TYPE_CONNECT;
pMsgBody->msgLen = sizeof(SConnectMsg);
pMsgBody->requestObjRefId = pRequest->self;
SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
if (pConnect == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
// TODO refactor full_name
char *db; // ugly code to move the space
STscObj *pObj = pRequest->pTscObj;
pthread_mutex_lock(&pObj->mutex);
db = strstr(pObj->db, TS_PATH_DELIMITER);
db = (db == NULL) ? pObj->db : db + 1;
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
pthread_mutex_unlock(&pObj->mutex);
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
pMsgBody->pData = pConnect;
return 0;
}
static void destroyConnectMsg(SRequestMsgBody* pMsgBody) {
assert(pMsgBody != NULL);
tfree(pMsgBody->pData);
}
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
char *pMsg = rpcMallocCont(pBody->msgLen);
if (NULL == pMsg) {
tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return -1;
}
memcpy(pMsg, pBody->pData, pBody->msgLen);
SRpcMsg rpcMsg = {
.msgType = pBody->msgType,
.pCont = pMsg,
.contLen = pBody->msgLen,
.ahandle = (void*) pBody->requestObjRefId,
.handle = NULL,
.code = 0
};
rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
return TSDB_CODE_SUCCESS;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int64_t requestRefId = (int64_t)pMsg->ahandle;
SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId);
if (pRequest == NULL) {
rpcFreeCont(pMsg->pCont);
return;
}
assert(pRequest->self == requestRefId);
pRequest->metric.rsp = taosGetTimestampMs();
pRequest->code = pMsg->code;
STscObj *pTscObj = pRequest->pTscObj;
if (pEpSet) {
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
}
}
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
if (handleRequestRspFp[pRequest->type]) {
pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, pMsg->pCont, pMsg->contLen);
}
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d", pRequest->requestId, taosMsg[pMsg->msgType],
tstrerror(pMsg->code), pMsg->contLen);
}
taosReleaseRef(tscReqRef, requestRefId);
rpcFreeCont(pMsg->pCont);
sem_post(&pRequest->body.rspSem);
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,85 @@
#include "clientInt.h"
#include "trpc.h"
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "tscLog.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimezone.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
static int32_t sentinel = TSC_VAR_NOT_RELEASE;
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
extern int32_t tscInitRes;
int taos_options(TSDB_OPTION option, const void *arg, ...) {
static int32_t lock = 0;
for (int i = 1; atomic_val_compare_exchange_32(&lock, 0, 1) != 0; ++i) {
if (i % 1000 == 0) {
tscInfo("haven't acquire lock after spin %d times.", i);
sched_yield();
}
}
int ret = taos_options_imp(option, (const char*)arg);
atomic_store_32(&lock, 0);
return ret;
}
int taos_init() {
pthread_once(&tscinit, taos_init_imp);
return tscInitRes;
}
// this function may be called by user or system, or by both simultaneously.
void taos_cleanup(void) {
tscDebug("start to cleanup client environment");
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return;
}
int32_t id = tscReqRef;
tscReqRef = -1;
taosCloseRef(id);
void* p = tscQhandle;
tscQhandle = NULL;
taosCleanUpScheduler(p);
id = tscConnRef;
tscConnRef = -1;
taosCloseRef(id);
rpcCleanup();
taosCloseLog();
}
void taos_close(TAOS* taos) {
if (taos == NULL) {
return;
}
STscObj *pTscObj = (STscObj *)taos;
tscDebug("0x%"PRIx64" try to close connection, numOfReq:%d", pTscObj->id, pTscObj->numOfReqs);
taosRemoveRef(tscConnRef, pTscObj->id);
}
const char *taos_errstr(TAOS_RES *res) {
}
void taos_free_result(TAOS_RES *res) {
}

471
source/client/src/tscEnv.c Normal file
View File

@ -0,0 +1,471 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taosmsg.h"
#include "tcache.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tnote.h"
#include "tref.h"
#include "tscLog.h"
#include "tsched.h"
#include "ttime.h"
#include "trpc.h"
#include "ttimezone.h"
#include "clientInt.h"
#define TSC_VAR_NOT_RELEASE 1
#define TSC_VAR_RELEASED 0
SAppInfo appInfo;
int32_t tscReqRef = -1;
int32_t tscConnRef = -1;
void *tscQhandle = NULL;
int32_t tsNumOfThreads = 1;
volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj* pRequest) {
STscObj *pTscObj = (STscObj *)taosAcquireRef(tscConnRef, pRequest->pTscObj->id);
assert(pTscObj != NULL);
// connection has been released already, abort creating request.
pRequest->self = taosAddRef(tscReqRef, pRequest);
int32_t num = atomic_add_fetch_32(&pTscObj->numOfReqs, 1);
if (pTscObj->pAppInfo) {
SInstanceActivity *pActivity = &pTscObj->pAppInfo->summary;
int32_t total = atomic_add_fetch_32(&pActivity->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_32(&pActivity->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 ", current:%d, app current:%d, total:%d", pRequest->self,
pRequest->pTscObj->id, num, currentInst, total);
}
}
static void deregisterRequest(SRequestObj* pRequest) {
assert(pRequest != NULL);
STscObj* pTscObj = pRequest->pTscObj;
SInstanceActivity* pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_32(&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
tscDebug("0x%"PRIx64" free Request from connObj: 0x%"PRIx64", current:%d, app current:%d", pRequest->self, pTscObj->id, num, currentInst);
taosReleaseRef(tscConnRef, pTscObj->id);
}
static void tscInitLogFile() {
taosReadGlobalLogCfg();
if (mkdir(tsLogDir, 0755) != 0 && errno != EEXIST) {
printf("failed to create log dir:%s\n", tsLogDir);
}
const char *defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
void closeTransporter(STscObj* pTscObj) {
if (pTscObj == NULL || pTscObj->pTransporter == NULL) {
return;
}
tscDebug("free transporter:%p in connObj: 0x%"PRIx64, pTscObj->pTransporter, pTscObj->id);
rpcClose(pTscObj->pTransporter);
pTscObj->pTransporter = NULL;
}
// TODO refactor
void* openTransporter(const char *user, const char *auth) {
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC";
rpcInit.numOfThreads = tsNumOfThreads;
rpcInit.cfp = processMsgFromServer;
rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)user;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.ckey = "key";
// rpcInit.spi = 1;
rpcInit.secret = (char *)auth;
void* pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
return NULL;
}
return pDnodeConn;
}
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
tscDebug("connObj 0x%"PRIx64" destroyed, totalConn:%"PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
closeTransporter(pTscObj);
pthread_mutex_destroy(&pTscObj->mutex);
tfree(pTscObj);
}
void* createTscObj(const char* user, const char* auth, const char *ip, uint32_t port, SAppInstInfo* pAppInfo) {
STscObj *pObj = (STscObj *)calloc(1, sizeof(STscObj));
if (NULL == pObj) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pObj->pAppInfo = pAppInfo;
if (pAppInfo != NULL) {
pObj->pTransporter = pAppInfo->pTransporter;
}
tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
pthread_mutex_init(&pObj->mutex, NULL);
pObj->id = taosAddRef(tscConnRef, pObj);
tscDebug("connObj created, 0x%"PRIx64, pObj->id);
return pObj;
}
void* createRequest(STscObj* pObj, __taos_async_fn_t fp, void* param, int32_t type) {
assert(pObj != NULL);
SRequestObj *pRequest = (SRequestObj *)calloc(1, sizeof(SRequestObj));
if (NULL == pRequest) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
// TODO generated request uuid
pRequest->requestId = 0;
pRequest->metric.start = taosGetTimestampMs();
pRequest->type = type;
pRequest->pTscObj = pObj;
pRequest->body.fp = fp;
pRequest->body.param = param;
tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest);
return pRequest;
}
static void doDestroyRequest(void* p) {
assert(p != NULL);
SRequestObj* pRequest = (SRequestObj*)p;
assert(RID_VALID(pRequest->self));
tfree(pRequest->msgBuf);
tfree(pRequest->sqlstr);
tfree(pRequest->pInfo);
deregisterRequest(pRequest);
tfree(pRequest);
}
void destroyRequest(SRequestObj* pRequest) {
if (pRequest == NULL) {
return;
}
taosReleaseRef(tscReqRef, pRequest->self);
}
void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet.
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
atexit(taos_cleanup);
errno = TSDB_CODE_SUCCESS;
srand(taosGetTimestampSec());
deltaToUtcInitOnce();
taosInitGlobalCfg();
taosReadCfgFromFile();
tscInitLogFile();
if (taosCheckAndPrintCfg()) {
tscInitRes = -1;
return;
}
taosInitNotes();
initMsgHandleFp();
rpcInit();
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
taosSetCoreDump(true);
double factor = 4.0;
int32_t numOfThreads = MAX((int)(tsNumOfCores * tsNumOfThreadsPerCore / factor), 2);
int32_t queueSize = tsMaxConnections * 2;
tscQhandle = taosInitScheduler(queueSize, numOfThreads, "tsc");
if (NULL == tscQhandle) {
tscError("failed to init task queue");
tscInitRes = -1;
return;
}
tscDebug("client task queue is initialized, numOfThreads: %d", numOfThreads);
tscConnRef = taosOpenRef(200, destroyTscObj);
tscReqRef = taosOpenRef(40960, doDestroyRequest);
taosGetAppName(appInfo.appName, NULL);
appInfo.pid = taosGetPId();
appInfo.startTime = taosGetTimestampMs();
appInfo.pInstMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
tscDebug("client is initialized successfully");
}
int taos_options_imp(TSDB_OPTION option, const char *str) {
SGlobalCfg *cfg = NULL;
switch (option) {
case TSDB_OPTION_CONFIGDIR:
cfg = taosGetConfigOption("configDir");
assert(cfg != NULL);
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
tstrncpy(configDir, str, TSDB_FILENAME_LEN);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscInfo("set config file directory:%s", str);
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
case TSDB_OPTION_SHELL_ACTIVITY_TIMER:
cfg = taosGetConfigOption("shellActivityTimer");
assert(cfg != NULL);
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
tsShellActivityTimer = atoi(str);
if (tsShellActivityTimer < 1) tsShellActivityTimer = 1;
if (tsShellActivityTimer > 3600) tsShellActivityTimer = 3600;
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscInfo("set shellActivityTimer:%d", tsShellActivityTimer);
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %d", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], *(int32_t *)cfg->ptr);
}
break;
case TSDB_OPTION_LOCALE: { // set locale
cfg = taosGetConfigOption("locale");
assert(cfg != NULL);
size_t len = strlen(str);
if (len == 0 || len > TSDB_LOCALE_LEN) {
tscInfo("Invalid locale:%s, use default", str);
return -1;
}
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
char sep = '.';
if (strlen(tsLocale) == 0) { // locale does not set yet
char* defaultLocale = setlocale(LC_CTYPE, "");
// The locale of the current OS does not be set correctly, so the default locale cannot be acquired.
// The launch of current system will abort soon.
if (defaultLocale == NULL) {
tscError("failed to get default locale, please set the correct locale in current OS");
return -1;
}
tstrncpy(tsLocale, defaultLocale, TSDB_LOCALE_LEN);
}
// set the user specified locale
char *locale = setlocale(LC_CTYPE, str);
if (locale != NULL) { // failed to set the user specified locale
tscInfo("locale set, prev locale:%s, new locale:%s", tsLocale, locale);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
} else { // set the user specified locale failed, use default LC_CTYPE as current locale
locale = setlocale(LC_CTYPE, tsLocale);
tscInfo("failed to set locale:%s, current locale:%s", str, tsLocale);
}
tstrncpy(tsLocale, locale, TSDB_LOCALE_LEN);
char *charset = strrchr(tsLocale, sep);
if (charset != NULL) {
charset += 1;
charset = taosCharsetReplace(charset);
if (taosValidateEncodec(charset)) {
if (strlen(tsCharset) == 0) {
tscInfo("charset set:%s", charset);
} else {
tscInfo("charset changed from %s to %s", tsCharset, charset);
}
tstrncpy(tsCharset, charset, TSDB_LOCALE_LEN);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
} else {
tscInfo("charset:%s is not valid in locale, charset remains:%s", charset, tsCharset);
}
free(charset);
} else { // it may be windows system
tscInfo("charset remains:%s", tsCharset);
}
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
}
case TSDB_OPTION_CHARSET: {
/* set charset will override the value of charset, assigned during system locale changed */
cfg = taosGetConfigOption("charset");
assert(cfg != NULL);
size_t len = strlen(str);
if (len == 0 || len > TSDB_LOCALE_LEN) {
tscInfo("failed to set charset:%s", str);
return -1;
}
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
if (taosValidateEncodec(str)) {
if (strlen(tsCharset) == 0) {
tscInfo("charset is set:%s", str);
} else {
tscInfo("charset changed from %s to %s", tsCharset, str);
}
tstrncpy(tsCharset, str, TSDB_LOCALE_LEN);
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
} else {
tscInfo("charset:%s not valid", str);
}
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
}
case TSDB_OPTION_TIMEZONE:
cfg = taosGetConfigOption("timezone");
assert(cfg != NULL);
if (cfg->cfgStatus <= TAOS_CFG_CSTATUS_OPTION) {
tstrncpy(tsTimezone, str, TSDB_TIMEZONE_LEN);
tsSetTimeZone();
cfg->cfgStatus = TAOS_CFG_CSTATUS_OPTION;
tscDebug("timezone set:%s, input:%s by taos_options", tsTimezone, str);
} else {
tscWarn("config option:%s, input value:%s, is configured by %s, use %s", cfg->option, str, tsCfgStatusStr[cfg->cfgStatus], (char *)cfg->ptr);
}
break;
default:
// TODO return the correct error code to client in the format for taos_errstr()
tscError("Invalid option %d", option);
return -1;
}
return 0;
}
#if 0
#include "cJSON.h"
static setConfRet taos_set_config_imp(const char *config){
setConfRet ret = {SET_CONF_RET_SUCC, {0}};
static bool setConfFlag = false;
if (setConfFlag) {
ret.retCode = SET_CONF_RET_ERR_ONLY_ONCE;
strcpy(ret.retMsg, "configuration can only set once");
return ret;
}
taosInitGlobalCfg();
cJSON *root = cJSON_Parse(config);
if (root == NULL){
ret.retCode = SET_CONF_RET_ERR_JSON_PARSE;
strcpy(ret.retMsg, "parse json error");
return ret;
}
int size = cJSON_GetArraySize(root);
if(!cJSON_IsObject(root) || size == 0) {
ret.retCode = SET_CONF_RET_ERR_JSON_INVALID;
strcpy(ret.retMsg, "json content is invalid, must be not empty object");
return ret;
}
if(size >= 1000) {
ret.retCode = SET_CONF_RET_ERR_TOO_LONG;
strcpy(ret.retMsg, "json object size is too long");
return ret;
}
for(int i = 0; i < size; i++){
cJSON *item = cJSON_GetArrayItem(root, i);
if(!item) {
ret.retCode = SET_CONF_RET_ERR_INNER;
strcpy(ret.retMsg, "inner error");
return ret;
}
if(!taosReadConfigOption(item->string, item->valuestring, NULL, NULL, TAOS_CFG_CSTATUS_OPTION, TSDB_CFG_CTYPE_B_CLIENT)){
ret.retCode = SET_CONF_RET_ERR_PART;
if (strlen(ret.retMsg) == 0){
snprintf(ret.retMsg, RET_MSG_LENGTH, "part error|%s", item->string);
}else{
int tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
size_t leftSize = tmp >= 0 ? tmp : 0;
strncat(ret.retMsg, "|", leftSize);
tmp = RET_MSG_LENGTH - 1 - (int)strlen(ret.retMsg);
leftSize = tmp >= 0 ? tmp : 0;
strncat(ret.retMsg, item->string, leftSize);
}
}
}
cJSON_Delete(root);
setConfFlag = true;
return ret;
}
setConfRet taos_set_config(const char *config){
pthread_mutex_lock(&setConfMutex);
setConfRet ret = taos_set_config_imp(config);
pthread_mutex_unlock(&setConfMutex);
return ret;
}
#endif

View File

@ -0,0 +1,18 @@
MESSAGE(STATUS "build parser unit test")
# GoogleTest requires at least C++11
SET(CMAKE_CXX_STANDARD 11)
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(clientTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
clientTest
PUBLIC os util common transport gtest taos
)
TARGET_INCLUDE_DIRECTORIES(
clientTest
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/"
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc"
)

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <iostream>
#include "tglobal.h"
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "taos.h"
namespace {
} // namespace
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
taos_close(pConn);
}

View File

@ -20,8 +20,21 @@
extern "C" {
#endif
#include "tlog.h"
extern int32_t cDebugFlag;
#define tscFatal(...) do { if (cDebugFlag & DEBUG_FATAL) { taosPrintLog("TSC FATAL ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscError(...) do { if (cDebugFlag & DEBUG_ERROR) { taosPrintLog("TSC ERROR ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscWarn(...) do { if (cDebugFlag & DEBUG_WARN) { taosPrintLog("TSC WARN ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscInfo(...) do { if (cDebugFlag & DEBUG_INFO) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebug(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscTrace(...) do { if (cDebugFlag & DEBUG_TRACE) { taosPrintLog("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#define tscDebugL(...) do { if (cDebugFlag & DEBUG_DEBUG) { taosPrintLongString("TSC ", cDebugFlag, __VA_ARGS__); }} while(0)
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_INT_H_*/
#endif /*_TD_COMMON_INT_H_*/

View File

@ -0,0 +1,20 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define TAOS_MESSAGE_C
#include "taosmsg.h"

41
source/common/src/tep.c Normal file
View File

@ -0,0 +1,41 @@
#include "tep.h"
#include "tglobal.h"
#include "tlockfree.h"
int taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
*port = 0;
strcpy(fqdn, ep);
char *temp = strchr(fqdn, ':');
if (temp) {
*temp = 0;
*port = atoi(temp+1);
}
if (*port == 0) {
*port = tsServerPort;
return -1;
}
return 0;
}
bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2) {
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
return false;
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->port[i] != s2->port[i]
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
return false;
}
return true;
}
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet) {
taosCorBeginWrite(&pEpSet->version);
pEpSet->epSet = *pNewEpSet;
taosCorEndWrite(&pEpSet->version);
}

File diff suppressed because it is too large Load Diff

View File

@ -13,6 +13,181 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define TAOS_MESSAGE_C
#include "taosmsg.h"
#include "commonint.h"
int32_t (*tscBuildMsg[TSDB_MSG_TYPE_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
int32_t (*tscProcessMsgRsp[TSDB_MSG_TYPE_MAX])(void* output, char *msg, int32_t msgSize) = {0};
int32_t tscBuildVgroupListReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
*msgLen = 0;
return TSDB_CODE_SUCCESS;
}
int32_t tscBuildTableMetaReqMsg(void* input, char **msg, int32_t msgSize, int32_t *msgLen) {
if (NULL == input || NULL == msg || NULL == msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SBuildTableMetaInput* bInput = (SBuildTableMetaInput *)input;
int32_t estimateSize = sizeof(STableInfoMsg);
if (NULL == *msg || msgSize < estimateSize) {
tfree(*msg);
*msg = calloc(1, estimateSize);
if (NULL == *msg) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
STableInfoMsg *bMsg = (STableInfoMsg *)*msg;
bMsg->msgHead.vgId = bInput->vgId;
strncpy(bMsg->tableFname, bInput->tableFullName, sizeof(bMsg->tableFname));
bMsg->tableFname[sizeof(bMsg->tableFname) - 1] = 0;
*msgLen = (int32_t)sizeof(*bMsg);
return TSDB_CODE_SUCCESS;
}
int32_t tscProcessVgroupListRsp(void* output, char *msg, int32_t msgSize) {
if (NULL == output || NULL == msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
SVgroupListRspMsg *pRsp = (SVgroupListRspMsg *)msg;
pRsp->vgroupNum = htonl(pRsp->vgroupNum);
pRsp->vgroupVersion = htonl(pRsp->vgroupVersion);
if (pRsp->vgroupNum < 0) {
tscError("vgroup number[%d] in rsp is invalid", pRsp->vgroupNum);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
if (pRsp->vgroupVersion < 0) {
tscError("vgroup vgroupVersion[%d] in rsp is invalid", pRsp->vgroupVersion);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
if (msgSize != (pRsp->vgroupNum * sizeof(pRsp->vgroupInfo[0]) + sizeof(*pRsp))) {
tscError("vgroup list msg size mis-match, msgSize:%d, vgroup number:%d", msgSize, pRsp->vgroupNum);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
// keep SVgroupListInfo/SVgroupListRspMsg the same
*(SVgroupListInfo **)output = (SVgroupListInfo *)msg;
if (pRsp->vgroupNum == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < pRsp->vgroupNum; ++i) {
pRsp->vgroupInfo[i].vgId = htonl(pRsp->vgroupInfo[i].vgId);
for (int32_t n = 0; n < pRsp->vgroupInfo[i].numOfEps; ++n) {
pRsp->vgroupInfo[i].epAddr[n].port = htonl(pRsp->vgroupInfo[i].epAddr[n].port);
}
}
return TSDB_CODE_SUCCESS;
}
void msgInit() {
tscBuildMsg[TSDB_MSG_TYPE_TABLE_META] = tscBuildTableMetaReqMsg;
tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST] = tscBuildVgroupListReqMsg;
//tscProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST] = tscProcessVgroupListRsp;
/*
tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;
tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;
tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;
tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;
tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
tscBuildMsg[TSDB_SQL_RETRIEVE] = tscBuildRetrieveFromMgmtMsg;
tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;
tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;
tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;
tscProcessMsgRsp[TSDB_SQL_CONNECT] = tscProcessConnectRsp;
tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE] = tscProcessRetrieveRspFromNode; // rsp handled by same function.
tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_DB] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_CLI_VERSION] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_SERV_STATUS] = tscProcessLocalRetrieveRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
*/
}

View File

@ -112,12 +112,12 @@ int dmnReadConfig(const char *path) {
return -1;
}
if (taosReadGlobalCfg() != 0) {
if (taosReadCfgFromFile() != 0) {
uError("failed to read global config");
return -1;
}
if (taosCheckGlobalCfg() != 0) {
if (taosCheckAndPrintCfg() != 0) {
uError("failed to check global config");
return -1;
}

View File

@ -17,6 +17,7 @@
#include "dndDnode.h"
#include "dndTransport.h"
#include "dndVnodes.h"
#include "tep.h"
int32_t dndGetDnodeId(SDnode *pDnode) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;

View File

@ -72,7 +72,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_CREATE_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_ALTER_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STB] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STB_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_VGROUP_LIST] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
@ -235,18 +235,18 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
if (strcmp(user, INTERNAL_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
char pass[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
return 0;
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
// A simple temporary implementation
char pass[32] = {0};
char pass[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
memcpy(secret, pass, TSDB_KEY_LEN);
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 0;
*encrypt = 0;
*ckey = 0;
@ -288,8 +288,8 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_KEY_LEN);
memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN);
memcpy(secret, pRsp->secret, TSDB_PASSWORD_LEN);
memcpy(ckey, pRsp->ckey, TSDB_PASSWORD_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dDebug("user:%s, success to get user auth from other mnodes", user);
@ -368,4 +368,4 @@ void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pMsg) {
SEpSet epSet = {0};
dndGetMnodeEpSet(pDnode, &epSet);
dndSendMsgToDnode(pDnode, &epSet, pMsg);
}
}

View File

@ -120,7 +120,7 @@ SClient* createClient(const char* user, const char* pass, const char* fqdn, uint
SClient* pClient = (SClient*)calloc(1, sizeof(SClient));
ASSERT(pClient);
char secretEncrypt[32] = {0};
char secretEncrypt[TSDB_PASSWORD_LEN] = {0};
taosEncryptPass((uint8_t*)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit;

View File

@ -180,13 +180,11 @@ typedef struct SAcctObj {
typedef struct SUserObj {
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t superUser;
int32_t acctId;
SHashObj *prohibitDbHash;
} SUserObj;

View File

@ -29,6 +29,7 @@ typedef struct {
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int32_t pid; // pid of app that invokes taosc
int64_t appStartTime; // app start time
int32_t id;
int8_t killed;
int8_t align;
@ -44,7 +45,7 @@ typedef struct {
SQueryDesc *pQueries;
} SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app);
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
@ -102,13 +103,14 @@ void mndCleanupProfile(SMnode *pMnode) {
}
}
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app) {
static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1);
SConnObj connObj = {.pid = pid,
.appStartTime = startTime,
.id = connId,
.killed = 0,
.port = port,
@ -195,6 +197,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SConnectMsg *pReq = pMsg->rpcMsg.pCont;
pReq->pid = htonl(pReq->pid);
pReq->startTime = htobe64(pReq->startTime);
SRpcConnInfo info = {0};
if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) {
@ -216,7 +219,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
mndReleaseDb(pMnode, pDb);
}
SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app);
SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, pReq->startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr());
return -1;
@ -233,9 +236,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser != NULL) {
pRsp->acctId = htonl(pUser->acctId);
pRsp->superAuth = pUser->superAuth;
pRsp->readAuth = pUser->readAuth;
pRsp->writeAuth = pUser->writeAuth;
pRsp->superUser = pUser->superUser;
mndReleaseUser(pMnode, pUser);
}
@ -246,7 +247,8 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
pMsg->contLen = sizeof(SConnectRsp);
pMsg->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->id);
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pReq->app);
return 0;
}
@ -301,7 +303,7 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId);
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app);
pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app, 0);
if (pConn == NULL) {
mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr());
return -1;
@ -368,7 +370,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
@ -399,7 +401,7 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
@ -430,7 +432,7 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
@ -459,7 +461,7 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
@ -587,7 +589,7 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
@ -803,7 +805,7 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;

View File

@ -65,11 +65,9 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.readAuth = 1;
userObj.writeAuth = 1;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.superAuth = 1;
userObj.superUser = 1;
}
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
@ -89,7 +87,7 @@ static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) {
return -1;
}
#endif
#endif
return 0;
}
@ -100,13 +98,11 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pUser->user, TSDB_USER_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_KEY_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->pass, TSDB_PASSWORD_LEN)
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->superAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->readAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->writeAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->superUser)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
@ -128,13 +124,11 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->user, TSDB_USER_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_KEY_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->pass, TSDB_PASSWORD_LEN)
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->readAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->writeAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superUser)
return pRow;
}
@ -173,13 +167,11 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOldUser, SUserObj *pNewUser) {
mTrace("user:%s, perform update action", pOldUser->user);
memcpy(pOldUser->user, pNewUser->user, TSDB_USER_LEN);
memcpy(pOldUser->pass, pNewUser->pass, TSDB_KEY_LEN);
memcpy(pOldUser->pass, pNewUser->pass, TSDB_PASSWORD_LEN);
memcpy(pOldUser->acct, pNewUser->acct, TSDB_USER_LEN);
pOldUser->createdTime = pNewUser->createdTime;
pOldUser->updateTime = pNewUser->updateTime;
pOldUser->superAuth = pNewUser->superAuth;
pOldUser->readAuth = pNewUser->readAuth;
pOldUser->writeAuth = pNewUser->writeAuth;
pOldUser->superUser = pNewUser->superUser;
return 0;
}
@ -200,9 +192,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superAuth = 0;
userObj.readAuth = 1;
userObj.writeAuth = 1;
userObj.superUser = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) {
@ -515,7 +505,7 @@ static int32_t mndRetrieveUsers(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (pUser->superAuth) {
if (pUser->superUser) {
const char *src = "super";
STR_WITH_SIZE_TO_VARSTR(pWrite, src, strlen(src));
} else {

View File

@ -21,14 +21,58 @@ extern "C" {
#endif
#include "catalog.h"
#include "common.h"
#include "tlog.h"
#define CTG_DEFAULT_CLUSTER_NUMBER 6
#define CTG_DEFAULT_VGROUP_NUMBER 100
#define CTG_DEFAULT_INVALID_VERSION (-1)
typedef struct SVgroupListCache {
int32_t vgroupVersion;
SHashObj *cache; // key:vgId, value:SVgroupInfo*
SArray *arrayCache; // SVgroupInfo
} SVgroupListCache;
typedef struct SDBVgroupCache {
SHashObj *cache; //key:dbname, value:SDBVgroupInfo
} SDBVgroupCache;
typedef struct STableMetaCache {
SHashObj *cache; //key:fulltablename, value:STableMeta
} STableMetaCache;
typedef struct SCatalog {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pData; // items cached for each cluster, the hash key is the cluster-id, returned by mgmt node
SVgroupListCache vgroupCache;
SDBVgroupCache dbCache;
STableMetaCache tableCache;
} SCatalog;
typedef struct SCatalogMgmt {
void *pMsgSender; // used to send messsage to mnode to fetch necessary metadata
SHashObj *pCluster; // items cached for each cluster, the hash key is the cluster-id got from mgmt node
} SCatalogMgmt;
extern int32_t ctgDebugFlag;
#define ctgFatal(...) do { if (ctgDebugFlag & DEBUG_FATAL) { taosPrintLog("CTG FATAL ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgError(...) do { if (ctgDebugFlag & DEBUG_ERROR) { taosPrintLog("CTG ERROR ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgWarn(...) do { if (ctgDebugFlag & DEBUG_WARN) { taosPrintLog("CTG WARN ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgInfo(...) do { if (ctgDebugFlag & DEBUG_INFO) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebug(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); return _code; } } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
#ifdef __cplusplus
}
#endif
#endif /*_TD_CATALOG_INT_H_*/
#endif /*_TD_CATALOG_INT_H_*/

View File

@ -14,11 +14,294 @@
*/
#include "catalogInt.h"
#include "trpc.h"
#include "tmessage.h"
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) {
return (struct SCatalog*) 0x1;
SCatalogMgmt ctgMgmt = {0};
int32_t ctgGetVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SVgroupListInfo** pVgroup) {
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
int32_t code = tscBuildMsg[TSDB_MSG_TYPE_VGROUP_LIST](NULL, &msg, 0, &msgLen);
if (code) {
return code;
}
SRpcMsg rpcMsg = {
.msgType = TSDB_MSG_TYPE_VGROUP_LIST,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
code = tscProcessMsgRsp[TSDB_MSG_TYPE_VGROUP_LIST](pVgroup, rpcRsp.pCont, rpcRsp.contLen);
if (code) {
return code;
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) {
int32_t ctgGetVgroupFromCache(SCatalog* pCatalog, SArray** pVgroupList, int32_t* exist) {
if (NULL == pCatalog->vgroupCache.arrayCache || pCatalog->vgroupCache.vgroupVersion < 0) {
*exist = 0;
return TSDB_CODE_SUCCESS;
}
if (pVgroupList) {
*pVgroupList = taosArrayDup(pCatalog->vgroupCache.arrayCache);
}
*exist = 1;
return TSDB_CODE_SUCCESS;
}
int32_t catalogInit(SCatalogCfg *cfg) {
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == ctgMgmt.pCluster) {
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CLUSTER_NUMBER);
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle) {
if (NULL == clusterId || NULL == catalogHandle) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
if (NULL == ctgMgmt.pCluster) {
ctgError("cluster cache are not ready");
return TSDB_CODE_CTG_NOT_READY;
}
size_t clen = strlen(clusterId);
SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen);
if (clusterCtg) {
*catalogHandle = clusterCtg;
return TSDB_CODE_SUCCESS;
}
clusterCtg = calloc(1, sizeof(*clusterCtg));
if (NULL == clusterCtg) {
ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg));
return TSDB_CODE_CTG_MEM_ERROR;
}
clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
ctgError("put cluster %s cache to hash failed", clusterId);
tfree(clusterCtg);
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
*catalogHandle = clusterCtg;
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetVgroupVersion(struct SCatalog* pCatalog, int32_t* version) {
if (NULL == pCatalog || NULL == version) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
*version = pCatalog->vgroupCache.vgroupVersion;
return TSDB_CODE_SUCCESS;
}
int32_t catalogUpdateVgroup(struct SCatalog* pCatalog, SVgroupListInfo* pVgroup) {
if (NULL == pVgroup) {
ctgError("vgroup get from mnode succeed, but no output");
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
if (pVgroup->vgroupVersion < 0) {
ctgError("vgroup version[%d] is invalid", pVgroup->vgroupVersion);
return TSDB_CODE_CTG_INVALID_INPUT;
}
if (NULL == pCatalog->vgroupCache.arrayCache) {
pCatalog->vgroupCache.arrayCache = taosArrayInit(pVgroup->vgroupNum, sizeof(pVgroup->vgroupInfo[0]));
if (NULL == pCatalog->vgroupCache.arrayCache) {
ctgError("init array[%d] for cluster cache failed", pVgroup->vgroupNum);
return TSDB_CODE_CTG_MEM_ERROR;
}
} else {
taosArrayClear(pCatalog->vgroupCache.arrayCache);
}
if (NULL == pCatalog->vgroupCache.cache) {
pCatalog->vgroupCache.cache = taosHashInit(CTG_DEFAULT_VGROUP_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (NULL == pCatalog->vgroupCache.cache) {
ctgError("init hash[%d] for cluster cache failed", CTG_DEFAULT_VGROUP_NUMBER);
return TSDB_CODE_CTG_MEM_ERROR;
}
} else {
taosHashClear(pCatalog->vgroupCache.cache);
}
SVgroupInfo *vInfo = NULL;
for (int32_t i = 0; i < pVgroup->vgroupNum; ++i) {
vInfo = taosArrayPush(pCatalog->vgroupCache.arrayCache, &pVgroup->vgroupInfo[i]);
if (NULL == vInfo) {
ctgError("push to vgroup array cache failed");
goto error_exit;
}
if (taosHashPut(pCatalog->vgroupCache.cache, &pVgroup->vgroupInfo[i].vgId, sizeof(pVgroup->vgroupInfo[i].vgId), &vInfo, POINTER_BYTES) != 0) {
ctgError("push to vgroup hash cache failed");
goto error_exit;
}
}
pCatalog->vgroupCache.vgroupVersion = pVgroup->vgroupVersion;
return TSDB_CODE_SUCCESS;
error_exit:
if (pCatalog->vgroupCache.arrayCache) {
taosArrayDestroy(pCatalog->vgroupCache.arrayCache);
pCatalog->vgroupCache.arrayCache = NULL;
}
if (pCatalog->vgroupCache.cache) {
taosHashCleanup(pCatalog->vgroupCache.cache);
pCatalog->vgroupCache.cache = NULL;
}
pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
int32_t catalogGetVgroup(SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray** pVgroupList) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pRpc) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
int32_t exist = 0;
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist));
if (exist) {
return TSDB_CODE_SUCCESS;
}
SVgroupListInfo *pVgroup = NULL;
CTG_ERR_RET(ctgGetVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &pVgroup));
CTG_ERR_RET(catalogUpdateVgroup(pCatalog, pVgroup));
if (pVgroupList) {
CTG_ERR_RET(ctgGetVgroupFromCache(pCatalog, pVgroupList, &exist));
}
if (0 == exist) {
ctgError("catalog fetched but get from cache failed");
return TSDB_CODE_CTG_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
if (NULL == pCatalog || NULL == dbName || NULL == version) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
if (NULL == pCatalog->dbCache.cache) {
*version = CTG_DEFAULT_INVALID_VERSION;
return TSDB_CODE_SUCCESS;
}
SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
if (NULL == dbInfo) {
*version = CTG_DEFAULT_INVALID_VERSION;
return TSDB_CODE_SUCCESS;
}
*version = dbInfo->vgroupVersion;
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
}
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
}
int32_t catalogGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, const STagData* tagData, STableMeta* pTableMeta) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
SBuildTableMetaInput bInput = {0};
char *msg = NULL;
SEpSet *pVnodeEpSet = NULL;
int32_t msgLen = 0;
int32_t code = tscBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen);
if (code) {
return code;
}
SRpcMsg rpcMsg = {
.msgType = TSDB_MSG_TYPE_TABLE_META,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pTableName, STableMeta* pTableMeta) {
}
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
if (NULL == pCatalog || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
return TSDB_CODE_CTG_INVALID_INPUT;
}
return 0;
}
void catalogDestroy(void) {
if (ctgMgmt.pCluster) {
taosHashCleanup(ctgMgmt.pCluster); //TBD
ctgMgmt.pCluster = NULL;
}
}

View File

@ -79,13 +79,13 @@ int32_t checkForInvalidExpr(SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf);
* @param msgBufLen
* @return
*/
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen);
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen);
/**
* Destroy the meta data request structure.
* @param pMetaInfo
*/
void qParserClearupMetaRequestInfo(SMetaReq* pMetaInfo);
void qParserClearupMetaRequestInfo(SCatalogReq* pMetaInfo);
#ifdef __cplusplus
}

View File

@ -4077,18 +4077,18 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer
}
#endif
SMetaReq req = {0};
SCatalogReq req = {0};
SMetaData data = {0};
// TODO: check if the qnode info has been cached already
req.qNodeEpset = true;
req.qNodeRequired = true;
code = qParserExtractRequestedMetaInfo(pInfo, &req, msgBuf, msgBufLen);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// load the meta data from catalog
code = catalogGetMetaData(pCatalog, &req, &data);
code = catalogGetAllMeta(pCatalog, NULL, &req, &data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -178,15 +178,15 @@ static int32_t buildTableName(SInsertParseContext* pCxt, SToken* pStname, SArray
return TSDB_CODE_SUCCESS;
}
static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SMetaReq* pMetaReq) {
static int32_t buildMetaReq(SInsertParseContext* pCxt, SToken* pStname, SCatalogReq* pMetaReq) {
pMetaReq->pTableName = taosArrayInit(4, sizeof(SName));
return buildTableName(pCxt, pStname, pMetaReq->pTableName);
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SToken* pTname) {
SMetaReq req;
SCatalogReq req;
CHECK_CODE(buildMetaReq(pCxt, pTname, &req));
CHECK_CODE(catalogGetMetaData(pCxt->pCatalog, &req, &pCxt->meta));
CHECK_CODE(catalogGetTableMeta(pCxt->pCatalog, NULL, NULL, NULL, &pCxt->meta)); //TODO
pCxt->pTableMeta = (STableMeta*)taosArrayGetP(pCxt->meta.pTableMeta, 0);
return TSDB_CODE_SUCCESS;
}
@ -861,13 +861,15 @@ int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
.pComCxt = pContext,
.pSql = pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pCatalog = getCatalogHandle(pContext->pEpSet),
.pCatalog = NULL,
.pTableMeta = NULL,
.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false),
.totalNum = 0,
.pOutput = *pInfo
};
CHECK_CODE(catalogGetHandle(NULL, &context.pCatalog)); //TODO
if (NULL == context.pTableBlockHashObj) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}

View File

@ -43,7 +43,12 @@ int32_t qParseQuerySql(const char* pStr, size_t length, struct SQueryStmtInfo**
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
struct SCatalog* pCatalog = getCatalogHandle(NULL);
struct SCatalog* pCatalog = NULL;
int32_t code = catalogGetHandle(NULL, &pCatalog);
if (code) {
return code;
}
return qParserValidateSqlNode(pCatalog, &info, *pQueryInfo, id, msg, msgLen);
}
@ -132,7 +137,7 @@ static void freePtrElem(void* p) {
tfree(*(char**)p);
}
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMetaInfo, char* msg, int32_t msgBufLen) {
int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SCatalogReq* pMetaInfo, char* msg, int32_t msgBufLen) {
int32_t code = TSDB_CODE_SUCCESS;
SMsgBuf msgBuf = {.buf = msg, .len = msgBufLen};
@ -189,7 +194,7 @@ int32_t qParserExtractRequestedMetaInfo(const SSqlInfo* pSqlInfo, SMetaReq* pMet
return code;
}
void qParserClearupMetaRequestInfo(SMetaReq* pMetaReq) {
void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) {
if (pMetaReq == NULL) {
return;
}

View File

@ -1448,23 +1448,6 @@ void* vgroupInfoClear(SVgroupsInfo *vgroupList) {
return NULL;
}
char* serializeTagData(STagData* pTagData, char* pMsg) {
int32_t n = (int32_t) strlen(pTagData->name);
*(int32_t*) pMsg = htonl(n);
pMsg += sizeof(n);
memcpy(pMsg, pTagData->name, n);
pMsg += n;
*(int32_t*)pMsg = htonl(pTagData->dataLen);
pMsg += sizeof(int32_t);
memcpy(pMsg, pTagData->data, pTagData->dataLen);
pMsg += pTagData->dataLen;
return pMsg;
}
int32_t copyTagData(STagData* dst, const STagData* src) {
dst->dataLen = src->dataLen;
tstrncpy(dst->name, src->name, tListLen(dst->name));

View File

@ -48,6 +48,6 @@ struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) {
return mockCatalogService->getCatalogHandle(pMgmtEps);
}
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) {
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SCatalogReq* pMetaReq, SMetaData* pMetaData) {
return mockCatalogService->catalogGetMetaData(pCatalog, pMetaReq, pMetaData);
}

View File

@ -22,6 +22,6 @@ void generateMetaData(MockCatalogService* mcs);
// mock
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps);
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData);
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SCatalogReq* pMetaReq, SMetaData* pMetaData);
#endif // MOCK_CATALOG_H

View File

@ -87,7 +87,7 @@ public:
return (struct SCatalog*)0x01;
}
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) const {
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SCatalogReq* pMetaReq, SMetaData* pMetaData) const {
assert(nullptr != pMetaReq && 1 == taosArrayGetSize(pMetaReq->pTableName));
SName* fullName = (SName*)taosArrayGet(pMetaReq->pTableName, 0);
std::unique_ptr<STableMeta> table;
@ -248,7 +248,7 @@ struct SCatalog* MockCatalogService::getCatalogHandle(const SEpSet* pMgmtEps) co
return impl_->getCatalogHandle(pMgmtEps);
}
int32_t MockCatalogService::catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) const {
int32_t MockCatalogService::catalogGetMetaData(struct SCatalog* pCatalog, const SCatalogReq* pMetaReq, SMetaData* pMetaData) const {
return impl_->catalogGetMetaData(pCatalog, pMetaReq, pMetaData);
}

View File

@ -50,7 +50,7 @@ public:
MockCatalogService();
~MockCatalogService();
struct SCatalog* getCatalogHandle(const SEpSet* pMgmtEps) const;
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SMetaReq* pMetaReq, SMetaData* pMetaData) const;
int32_t catalogGetMetaData(struct SCatalog* pCatalog, const SCatalogReq* pMetaReq, SMetaData* pMetaData) const;
ITableBuilder& createTableBuilder(const std::string& db, const std::string& tbname, int8_t tableType, int32_t numOfColumns, int32_t numOfTags = 0);
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
void showTables() const;

View File

@ -38,7 +38,7 @@ void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_
strcpy(p->name, name);
}
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq* req) {
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SCatalogReq* req) {
pQueryInfo->numOfTables = 1;
pQueryInfo->pTableMetaInfo = (STableMetaInfo**)calloc(1, POINTER_BYTES);
@ -80,7 +80,7 @@ void sqlCheck(const char* sql, bool valid) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -117,7 +117,7 @@ TEST(testCase, validateAST_test) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -175,7 +175,7 @@ TEST(testCase, function_Test) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -221,7 +221,7 @@ TEST(testCase, function_Test2) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -267,7 +267,7 @@ TEST(testCase, function_Test3) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -312,7 +312,7 @@ TEST(testCase, function_Test4) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -360,7 +360,7 @@ TEST(testCase, function_Test5) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -445,7 +445,7 @@ TEST(testCase, function_Test6) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -523,7 +523,7 @@ TEST(testCase, function_Test6) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -585,7 +585,7 @@ TEST(testCase, function_Test6) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -664,7 +664,7 @@ TEST(testCase, function_Test6) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);

View File

@ -39,7 +39,7 @@ void setSchema(SSchema* p, int32_t type, int32_t bytes, const char* name, int32_
strcpy(p->name, name);
}
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SCatalogReq *req) {
pQueryInfo->numOfTables = 1;
pQueryInfo->pTableMetaInfo = (STableMetaInfo**)calloc(1, POINTER_BYTES);
@ -79,7 +79,7 @@ void generateLogicplan(const char* sql) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
@ -119,7 +119,7 @@ TEST(testCase, planner_test) {
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
ASSERT_EQ(code, 0);
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);

View File

@ -709,7 +709,7 @@ TEST(testCase, extractMeta_test) {
ASSERT_EQ(info1.valid, true);
char msg[128] = {0};
SMetaReq req = {0};
SCatalogReq req = {0};
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
ASSERT_EQ(ret, 0);
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);

View File

@ -228,7 +228,7 @@ static int syncInitRpcServer(SSyncManager* syncManager, const SSyncCluster* pSyn
}
static int syncInitRpcClient(SSyncManager* syncManager) {
char secret[TSDB_KEY_LEN] = "secret";
char secret[TSDB_PASSWORD_LEN] = "secret";
SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "sync-client";

View File

@ -23,11 +23,10 @@ extern "C" {
#include "tlog.h"
extern int32_t rpcDebugFlag;
extern int8_t tscEmbedded;
#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", rpcDebugFlag, __VA_ARGS__); }}
#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR) { taosPrintLog("RPC ERROR ", rpcDebugFlag, __VA_ARGS__); }}
#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", rpcDebugFlag, __VA_ARGS__); }}
#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", tscEmbedded ? 255 : rpcDebugFlag, __VA_ARGS__); }}
#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }}
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", rpcDebugFlag, __VA_ARGS__); }}

View File

@ -51,8 +51,8 @@ typedef struct {
char user[TSDB_UNI_LEN]; // meter ID
char spi; // security parameter index
char encrypt; // encrypt algorithm
char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key
char secret[TSDB_PASSWORD_LEN]; // secret for the link
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
void (*cfp)(void *parent, SRpcMsg *, SEpSet *);
int (*afp)(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey);
@ -97,8 +97,8 @@ typedef struct SRpcConn {
char user[TSDB_UNI_LEN]; // user ID for the link
char spi; // security parameter index
char encrypt; // encryption, 0:1
char secret[TSDB_KEY_LEN]; // secret for the link
char ckey[TSDB_KEY_LEN]; // ciphering key
char secret[TSDB_PASSWORD_LEN]; // secret for the link
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
char secured; // if set to 1, no authentication
uint16_t localPort; // for UDP only
uint32_t linkUid; // connection unique ID assigned by client
@ -229,8 +229,6 @@ static void rpcInitImp(void) {
tsRpcOverhead = sizeof(SRpcReqContext);
tsRpcRefId = taosOpenRef(200, rpcFree);
return 0;
}
int32_t rpcInit(void) {
@ -413,7 +411,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
// for TDengine, all the query, show commands shall have TCP connection
char type = pMsg->msgType;
if (type == TSDB_MSG_TYPE_QUERY || type == TSDB_MSG_TYPE_SHOW_RETRIEVE
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_STB_VGROUP
|| type == TSDB_MSG_TYPE_FETCH || type == TSDB_MSG_TYPE_VGROUP_LIST
|| type == TSDB_MSG_TYPE_TABLES_META || type == TSDB_MSG_TYPE_TABLE_META
|| type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_STATUS || type == TSDB_MSG_TYPE_ALTER_TABLE)
pContext->connType = RPC_CONN_TCPC;
@ -705,7 +703,7 @@ static SRpcConn *rpcAllocateClientConn(SRpcInfo *pRpc) {
pConn->linkUid = (uint32_t)((int64_t)pConn + taosGetPid() + (int64_t)pConn->tranId);
pConn->spi = pRpc->spi;
pConn->encrypt = pRpc->encrypt;
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_KEY_LEN);
if (pConn->spi) memcpy(pConn->secret, pRpc->secret, TSDB_PASSWORD_LEN);
tDebug("%s %p client connection is allocated, uid:0x%x", pRpc->label, pConn, pConn->linkUid);
}
@ -1534,9 +1532,9 @@ static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
int ret = -1;
tMD5Init(&context);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t *)pMsg, msgLen);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
@ -1548,9 +1546,9 @@ static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
tMD5Update(&context, (uint8_t *)pMsg, msgLen);
tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN);
tMD5Update(&context, (uint8_t *)pKey, TSDB_PASSWORD_LEN);
tMD5Final(&context);
memcpy(pAuth, context.digest, sizeof(context.digest));

View File

@ -33,10 +33,12 @@ typedef struct WalFileInfo {
int64_t fileSize;
} WalFileInfo;
#pragma pack(push,1)
typedef struct WalIdxEntry {
int64_t ver;
int64_t offset;
} WalIdxEntry;
#pragma pack(pop)
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft;
@ -78,11 +80,11 @@ static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) {
}
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
return sprintf(buf, "%s/%020" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer);
}
static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
return sprintf(buf, "%s/%020" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer);
}
static inline int walValidHeadCksum(SWalHead* pHead) {

View File

@ -68,9 +68,12 @@ int32_t walInit() {
}
void walCleanUp() {
int old = atomic_val_compare_exchange_8(&tsWal.inited, 1, 0);
if(old == 0) {
return;
}
walStopThread();
taosCloseRef(tsWal.refSetId);
atomic_store_8(&tsWal.inited, 0);
wInfo("wal module is cleaned up");
}
@ -252,9 +255,8 @@ static int32_t walCreateThread() {
static void walStopThread() {
atomic_store_8(&tsWal.stop, 1);
if (tsWal.thread != NULL && taosCheckPthreadValid(tsWal.thread)) {
if (taosCheckPthreadValid(tsWal.thread)) {
pthread_join(tsWal.thread, NULL);
tsWal.thread = NULL;
}
wDebug("wal thread is stopped");

View File

@ -21,16 +21,25 @@ SWalReadHandle* walOpenReadHandle(SWal* pWal) {
if(pRead == NULL) {
return NULL;
}
memset(pRead, 0, sizeof(SWalReadHandle));
pRead->pWal = pWal;
pRead->readIdxTfd = -1;
pRead->readLogTfd = -1;
return NULL;
pRead->curVersion = -1;
pRead->curFileFirstVer = -1;
pRead->capacity = 0;
pRead->status = 0;
pRead->pHead = malloc(sizeof(SWalHead));
if(pRead->pHead == NULL) {
free(pRead);
return NULL;
}
return pRead;
}
void walCloseReadHandle(SWalReadHandle *pRead) {
tfClose(pRead->readIdxTfd);
tfClose(pRead->readLogTfd);
tfree(pRead->pHead);
free(pRead);
}
@ -47,18 +56,17 @@ static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
//seek position
int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) {
if(code < 0) {
return -1;
}
WalIdxEntry entry;
code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry));
if(code != 0) {
if(tfRead(idxTfd, &entry, sizeof(WalIdxEntry)) != sizeof(WalIdxEntry)) {
return -1;
}
//TODO:deserialize
ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code != 0) {
if (code < 0) {
return -1;
}
return code;
@ -71,13 +79,13 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
tfClose(pRead->readLogTfd);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int logTfd = tfOpenRead(fnameStr);
int64_t logTfd = tfOpenRead(fnameStr);
if(logTfd < 0) {
return -1;
}
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
int idxTfd = tfOpenRead(fnameStr);
int64_t idxTfd = tfOpenRead(fnameStr);
if(idxTfd < 0) {
return -1;
}
@ -90,7 +98,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
int code;
SWal *pWal = pRead->pWal;
if(ver == pWal->vers.lastVer) {
if(ver == pRead->curVersion) {
return 0;
}
if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
@ -126,33 +134,41 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
//TODO: check wal life
if(pRead->curVersion != ver) {
walReadSeekVer(pRead, ver);
code = walReadSeekVer(pRead, ver);
if(code != 0) {
return -1;
}
}
if(!tfValid(pRead->readLogTfd)) return -1;
if(sizeof(SWalHead) != tfRead(pRead->readLogTfd, &pRead->head, sizeof(SWalHead))) {
code = tfRead(pRead->readLogTfd, pRead->pHead, sizeof(SWalHead));
if(code != sizeof(SWalHead)) {
return -1;
}
code = walValidHeadCksum(&pRead->head);
code = walValidHeadCksum(pRead->pHead);
if(code != 0) {
return -1;
}
if(pRead->capacity < pRead->head.head.len) {
void* ptr = realloc(pRead, pRead->head.head.len);
if(pRead->capacity < pRead->pHead->head.len) {
void* ptr = realloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
if(ptr == NULL) {
return -1;
}
pRead = ptr;
pRead->capacity = pRead->head.head.len;
pRead->pHead = ptr;
pRead->capacity = pRead->pHead->head.len;
}
if(pRead->head.head.len != tfRead(pRead->readLogTfd, &pRead->head.head.body, pRead->head.head.len)) {
if(pRead->pHead->head.len != tfRead(pRead->readLogTfd, pRead->pHead->head.body, pRead->pHead->head.len)) {
return -1;
}
code = walValidBodyCksum(&pRead->head);
/*code = walValidBodyCksum(pRead->pHead);*/
ASSERT(pRead->pHead->head.version == ver);
if(code != 0) {
return -1;
}
pRead->curVersion++;
return 0;
}

View File

@ -377,11 +377,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
//must truncate explicitly first
return -1;
}
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
/*if (!tfValid(pWal->writeLogTfd)) return -1;*/
pthread_mutex_lock(&pWal->mutex);
pWal->writeHead.head.version = index;
int64_t offset = walGetCurFileOffset(pWal);
pWal->writeHead.head.len = bodyLen;
pWal->writeHead.head.msgType = msgType;
pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
@ -393,12 +394,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) {
if (tfWrite(pWal->writeLogTfd, (char*)body, bodyLen) != bodyLen) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
code = walWriteIndex(pWal, index, offset);
if(code != 0) {
//TODO
return -1;

View File

@ -5,6 +5,9 @@
#include "walInt.h"
const char* ranStr = "tvapq02tcp";
const int ranStrLen = strlen(ranStr);
class WalCleanEnv : public ::testing::Test {
protected:
static void SetUpTestCase() {
@ -157,15 +160,13 @@ TEST_F(WalCleanEnv, removeOldMeta) {
TEST_F(WalKeepEnv, readOldMeta) {
walResetEnv();
const char* ranStr = "tvapq02tcp";
int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
@ -179,7 +180,7 @@ TEST_F(WalKeepEnv, readOldMeta) {
char* newss = walMetaSerialize(pWal);
len = strlen(oldss);
int len = strlen(oldss);
ASSERT_EQ(len, strlen(newss));
for(int i = 0; i < len; i++) {
EXPECT_EQ(oldss[i], newss[i]);
@ -189,14 +190,12 @@ TEST_F(WalKeepEnv, readOldMeta) {
}
TEST_F(WalCleanEnv, write) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len);
code = walWrite(pWal, i+2, i, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->vers.lastVer, i);
}
@ -205,11 +204,9 @@ TEST_F(WalCleanEnv, write) {
}
TEST_F(WalCleanEnv, rollback) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len);
code = walWrite(pWal, i, i+1, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
}
@ -224,12 +221,10 @@ TEST_F(WalCleanEnv, rollback) {
}
TEST_F(WalCleanDeleteEnv, roll) {
const char* ranStr = "tvapq02tcp";
const int len = strlen(ranStr);
int code;
int i;
for(i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len);
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->vers.lastVer, i);
code = walCommit(pWal, i);
@ -242,19 +237,55 @@ TEST_F(WalCleanDeleteEnv, roll) {
ASSERT_EQ(pWal->vers.snapshotVer, i-1);
ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, len);
code = walWrite(pWal, 5, 0, (void*)ranStr, ranStrLen);
ASSERT_NE(code, 0);
for(; i < 200; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len);
code = walWrite(pWal, i, 0, (void*)ranStr, ranStrLen);
ASSERT_EQ(code, 0);
code = walCommit(pWal, i);
ASSERT_EQ(pWal->vers.commitVer, i);
}
//code = walWriteMeta(pWal);
code = walBeginTakeSnapshot(pWal, i - 1);
ASSERT_EQ(code, 0);
code = walEndTakeSnapshot(pWal);
ASSERT_EQ(code, 0);
}
TEST_F(WalKeepEnv, readHandleRead) {
walResetEnv();
int code;
SWalReadHandle* pRead = walOpenReadHandle(pWal);
ASSERT(pRead != NULL);
int i ;
for(i = 0; i < 100; i++) {
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, i);
int len = strlen(newStr);
code = walWrite(pWal, i, 0, newStr, len);
ASSERT_EQ(code, 0);
}
for(int i = 0; i < 1000; i++) {
int ver = rand() % 100;
code = walReadWithHandle(pRead, ver);
ASSERT_EQ(code, 0);
//printf("rrbody: \n");
//for(int i = 0; i < pRead->pHead->head.len; i++) {
//printf("%d ", pRead->pHead->head.body[i]);
//}
//printf("\n");
ASSERT_EQ(pRead->pHead->head.version, ver);
ASSERT_EQ(pRead->curVersion, ver+1);
char newStr[100];
sprintf(newStr, "%s-%d", ranStr, ver);
int len = strlen(newStr);
ASSERT_EQ(pRead->pHead->head.len, len);
for(int j = 0; j < len; j++) {
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
}
}
}

View File

@ -42,7 +42,7 @@ bool taosComparePthread(pthread_t first, pthread_t second) { return first.p == s
int32_t taosGetPId() { return GetCurrentProcessId(); }
int32_t taosGetCurrentAPPName(char* name, int32_t* len) {
int32_t taosGetAppName(char* name, int32_t* len) {
char filepath[1024] = {0};
GetModuleFileName(NULL, filepath, MAX_PATH);
@ -358,7 +358,7 @@ bool taosComparePthread(pthread_t first, pthread_t second) { return pthread_equa
int32_t taosGetPId() { return (int32_t)getpid(); }
int32_t taosGetCurrentAPPName(char *name, int32_t *len) {
int32_t taosGetAppName(char *name, int32_t *len) {
char buf[PATH_MAX + 1];
buf[0] = '\0';
proc_name(getpid(), buf, sizeof(buf) - 1);
@ -392,7 +392,7 @@ void taosResetPthread(pthread_t* thread) { *thread = 0; }
bool taosComparePthread(pthread_t first, pthread_t second) { return first == second; }
int32_t taosGetPId() { return getpid(); }
int32_t taosGetCurrentAPPName(char* name, int32_t* len) {
int32_t taosGetAppName(char* name, int32_t* len) {
const char* self = "/proc/self/exe";
char path[PATH_MAX] = {0};

View File

@ -275,13 +275,13 @@ char *strsep(char **stringp, const char *delim) {
}
char *getpass(const char *prefix) {
static char passwd[TSDB_KEY_LEN] = {0};
memset(passwd, 0, TSDB_KEY_LEN);
static char passwd[TSDB_PASSWORD_LEN] = {0};
memset(passwd, 0, TSDB_PASSWORD_LEN);
//printf("%s", prefix);
int32_t index = 0;
char ch;
while (index < TSDB_KEY_LEN) {
while (index < TSDB_PASSWORD_LEN) {
ch = getch();
if (ch == '\n' || ch == '\r') {
break;

View File

@ -714,7 +714,7 @@ static void taosGetSystemLocale() { // get and set default locale
//printf("locale not configured, set to system default:%s", tsLocale);
}
/* if user does not specify the charset, extract it from locale */
// if user does not specify the charset, extract it from locale
char *str = strrchr(tsLocale, sep);
if (str != NULL) {
str++;
@ -1118,13 +1118,13 @@ char *taosGetCmdlineByPID(int pid) {
SysNameInfo taosGetSysNameInfo() {
SysNameInfo info = {0};
struct utsname buf;
if (!uname(&buf)) {
info.sysname = buf.sysname;
info.sysname == buf.nodename;
info.sysname = buf.release;
info.sysname = buf.version;
info.sysname = buf.machine;
struct utsname uts;
if (!uname(&uts)) {
info.sysname = strdup(uts.sysname);
info.nodename = strdup(uts.nodename);
info.release = strdup(uts.release);
info.version = strdup(uts.version);
info.machine = strdup(uts.machine);
}
return info;

View File

@ -282,7 +282,7 @@ static void taosReadConfigOption(const char *option, char *value, char *value2,
}
}
void taosInitConfigOption(SGlobalCfg cfg) {
void taosAddConfigOption(SGlobalCfg cfg) {
tsGlobalConfig[tsGlobalConfigNum++] = cfg;
}
@ -335,7 +335,7 @@ void taosReadGlobalLogCfg() {
fclose(fp);
}
int32_t taosReadGlobalCfg() {
int32_t taosReadCfgFromFile() {
char * line, *option, *value, *value2, *value3;
int olen, vlen, vlen2, vlen3;
char fileName[PATH_MAX] = {0};
@ -396,7 +396,7 @@ int32_t taosReadGlobalCfg() {
return 0;
}
void taosPrintGlobalCfg() {
void taosPrintCfg() {
uInfo(" taos config & system info:");
uInfo("==================================");
@ -443,7 +443,6 @@ void taosPrintGlobalCfg() {
}
taosPrintOsInfo();
// taosPrintDataDirCfg();
uInfo("==================================");
}

View File

@ -127,6 +127,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DUP_TAG_NAMES, "duplicated tag names"
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON, "Invalid JSON format")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_JSON_TYPE, "Invalid JSON data type")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input")
// mnode
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed")
@ -497,6 +498,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_FILE_ALREADY_EXISTS, "tfs file already exis
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_LEVEL, "tfs invalid level")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk")
// catalog
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog interval error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
#ifdef TAOS_ERROR_C
};
#endif
@ -545,4 +555,4 @@ const char* tstrerror(int32_t err) {
return "";
}
const char* terrstr() { return tstrerror(terrno); }
const char* terrstr() { return tstrerror(terrno); }

View File

@ -99,6 +99,8 @@ int32_t wDebugFlag = 135;
int32_t tsdbDebugFlag = 131;
int32_t cqDebugFlag = 131;
int32_t fsDebugFlag = 135;
int32_t ctgDebugFlag = 131;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
@ -112,7 +114,7 @@ static int32_t taosPushLogBuffer(SLogBuff *tLogBuff, char *msg, int32_t msgLen
static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(int32_t oldFd);
static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum);
extern void taosPrintGlobalCfg();
extern void taosPrintCfg();
static int32_t taosCompressFile(char *srcFileName, char *destFileName);
static int32_t taosStartLog() {
@ -222,7 +224,7 @@ static void *taosThreadToOpenNewFile(void *param) {
uInfo(" new log file:%d is opened", tsLogObj.flag);
uInfo("==================================");
taosPrintGlobalCfg();
taosPrintCfg();
taosKeepOldLog(keepName);
return NULL;

View File

@ -49,15 +49,6 @@ int32_t taosInitNotes() {
taosInitNote(tsNumOfLogLines, 1, &tsTscNote, name);
}
if (tsHttpEnableRecordSql) {
snprintf(name, TSDB_FILENAME_LEN * 2, "%s/httpsql", tsLogDir);
taosInitNote(tsNumOfLogLines, 1, &tsHttpNote, name);
}
if (tscEmbedded == 1) {
snprintf(name, TSDB_FILENAME_LEN * 2, "%s/taosinfo", tsLogDir);
taosInitNote(tsNumOfLogLines, 1, &tsInfoNote, name);
}
#endif
return 0;
}

View File

@ -20,12 +20,10 @@
#include "tutil.h"
#include "taoserror.h"
extern int8_t tscEmbedded;
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR ", tscEmbedded ? 255 : tmrDebugFlag, __VA_ARGS__); }}
#define tmrFatal(...) { if (tmrDebugFlag & DEBUG_FATAL) { taosPrintLog("TMR FATAL ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrError(...) { if (tmrDebugFlag & DEBUG_ERROR) { taosPrintLog("TMR ERROR ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrWarn(...) { if (tmrDebugFlag & DEBUG_WARN) { taosPrintLog("TMR WARN ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrInfo(...) { if (tmrDebugFlag & DEBUG_INFO) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrDebug(...) { if (tmrDebugFlag & DEBUG_DEBUG) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}
#define tmrTrace(...) { if (tmrDebugFlag & DEBUG_TRACE) { taosPrintLog("TMR ", tmrDebugFlag, __VA_ARGS__); }}

View File

@ -417,16 +417,3 @@ void taosIp2String(uint32_t ip, char *str) {
void taosIpPort2String(uint32_t ip, uint16_t port, char *str) {
sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port);
}
int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port) {
*port = 0;
strcpy(fqdn, ep);
char *temp = strchr(fqdn, ':');
if (temp) {
*temp = 0;
*port = atoi(temp + 1);
}
return 0;
}

View File

@ -26,7 +26,7 @@ typedef int32_t (*FCqWrite)(int32_t vgId, void *pHead, int32_t qtype, void *pMsg
typedef struct {
int32_t vgId;
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]; // size must same with SVnodeObj.db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN]
FCqWrite cqWrite;
} SCqCfg;
@ -37,7 +37,7 @@ typedef struct {
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite;
struct SCqObj *pHead;

View File

@ -17515,7 +17515,7 @@
fun:gaih_inet.constprop.0
fun:getaddrinfo
fun:taosGetFqdn
fun:taosCheckGlobalCfg
fun:taosCheckAndPrintCfg
fun:taos_init_imp
}
{
@ -17740,7 +17740,7 @@
fun:gaih_inet.constprop.7
fun:getaddrinfo
fun:taosGetFqdn
fun:taosCheckGlobalCfg
fun:taosCheckAndPrintCfg
fun:taos_init_imp
}
{