Merge branch '3.0' into fix/3_liaohj

This commit is contained in:
Haojun Liao 2023-07-20 12:15:23 +08:00
commit 87fa8f0c4c
55 changed files with 923 additions and 322 deletions

View File

@ -42,10 +42,20 @@ In TDengine, the data types below can be used when specifying a column or tag.
| 14 | NCHAR | User Defined | Multi-byte string that can include multi byte characters like Chinese characters. Each character of NCHAR type consumes 4 bytes storage. The string value should be quoted with single quotes. Literal single quote inside the string must be preceded with backslash, like `\'`. The length must be specified when defining a column or tag of NCHAR type, for example nchar(10) means it can store at most 10 characters of nchar type and will consume fixed storage of 40 bytes. An error will be reported if the string value exceeds the length defined. |
| 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. |
| 16 | VARCHAR | User-defined | Alias of BINARY |
| 16 | GEOMETRY | User-defined | Geometry |
:::note
- Each row of the table cannot be longer than 48KB (64KB since version 3.0.5.0) (note that each BINARY/NCHAR/GEOMETRY column takes up an additional 2 bytes of storage space).
- Only ASCII visible characters are suggested to be used in a column or tag of BINARY type. Multi-byte characters must be stored in NCHAR type.
- The length of BINARY can be up to 16,374(data column is 65,517 and tag column is 16,382 since version 3.0.5.0) bytes. The string value must be quoted with single quotes. You must specify a length in bytes for a BINARY value, for example binary(20) for up to twenty single-byte characters. If the data exceeds the specified length, an error will occur. The literal single quote inside the string must be preceded with back slash like `\'`
- The maximum length of the GEOMETRY data column is 65,517 bytes, and the maximum length of the tag column is 16,382 bytes. Supports POINT, LINESTRING, and POLYGON subtypes of 2D. The following table describes the length calculation method:
| # | **Syntax** | **MinLen** | **MaxLen** | **Growth of each point** |
|---|--------------------------------------|------------|------------|--------------------------|
| 1 | POINT(1.0 1.0) | 21 | 21 | NA |
| 2 | LINESTRING(1.0 1.0, 2.0 2.0) | 9+2*16 | 9+4094*16 | +16 |
| 3 | POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0)) | 13+3*16 | 13+4094*16 | +16 |
- Numeric values in SQL statements will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number.
:::

View File

@ -45,9 +45,9 @@ table_option: {
1. The first column of a table MUST be of type TIMESTAMP. It is automatically set as the primary key.
2. The maximum length of the table name is 192 bytes.
3. The maximum length of each row is 48k(64k since version 3.0.5.0) bytes, please note that the extra 2 bytes used by each BINARY/NCHAR column are also counted.
3. The maximum length of each row is 48k(64k since version 3.0.5.0) bytes, please note that the extra 2 bytes used by each BINARY/NCHAR/GEOMETRY column are also counted.
4. The name of the subtable can only consist of characters from the English alphabet, digits and underscore. Table names can't start with a digit. Table names are case insensitive.
5. The maximum length in bytes must be specified when using BINARY or NCHAR types.
5. The maximum length in bytes must be specified when using BINARY/NCHAR/GEOMETRY types.
6. Escape character "\`" can be used to avoid the conflict between table names and reserved keywords, above rules will be bypassed when using escape character on table names, but the upper limit for the name length is still valid. The table names specified using escape character are case sensitive.
For example \`aBc\` and \`abc\` are different table names but `abc` and `aBc` are same table names because they are both converted to `abc` internally.
Only ASCII visible characters can be used with escape character.

View File

@ -39,7 +39,7 @@ TDengine supports the `UNION` and `UNION ALL` operations. UNION ALL collects all
| 3 | \>, < | All types except BLOB, MEDIUMBLOB, and JSON | Greater than and less than |
| 4 | \>=, <= | All types except BLOB, MEDIUMBLOB, and JSON | Greater than or equal to and less than or equal to |
| 5 | IS [NOT] NULL | All types | Indicates whether the value is null |
| 6 | [NOT] BETWEEN AND | All types except BLOB, MEDIUMBLOB, and JSON | Closed interval comparison |
| 6 | [NOT] BETWEEN AND | All types except BLOB, MEDIUMBLOB, JSON and GEOMETRY | Closed interval comparison |
| 7 | IN | All types except BLOB, MEDIUMBLOB, and JSON; the primary key (timestamp) is also not supported | Equal to any value in the list |
| 8 | LIKE | BINARY, NCHAR, and VARCHAR | Wildcard match |
| 9 | MATCH, NMATCH | BINARY, NCHAR, and VARCHAR | Regular expression match |

View File

@ -18,6 +18,7 @@ description: This document describes how TDengine SQL has changed in version 3.0
| 8 | Mixed operations | Enhanced | Mixing scalar and vector operations in queries has been enhanced and is supported in all SELECT clauses.
| 9 | Tag operations | Added | Tag columns can be used in queries and clauses like data columns.
| 10 | Timeline clauses and time functions in supertables | Enhanced | When PARTITION BY is not used, data in supertables is merged into a single timeline.
| 11 | GEOMETRY | Added | Geometry
## SQL Syntax

View File

@ -166,7 +166,7 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo
| Attribute | Description |
| ------------- | ---------------------------------------------------------------------------- |
| Applicable | Server Only |
| Applicable | Server and Client |
| Meaning | Switch for allowing TDengine to collect and report service usage information |
| Value Range | 0: Not allowed; 1: Allowed |
| Default Value | 1 |
@ -174,7 +174,7 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo
| Attribute | Description |
| ------------- | ---------------------------------------------------------------------------- |
| Applicable | Server Only |
| Applicable | Server and Client |
| Meaning | Switch for allowing TDengine to collect and report crash related information |
| Value Range | 0,1 0: Not allowed; 1: allowed |
| Default Value | 1 |

View File

@ -42,12 +42,21 @@ CREATE DATABASE db_name PRECISION 'ns';
| 14 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 NCHAR 字符占用 4 字节的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\'`。NCHAR 使用时须指定字符串大小,类型为 NCHAR(10) 的列表示此列的字符串最多存储 10 个 NCHAR 字符。如果用户字符串长度超出声明长度,将会报错。 |
| 15 | JSON | | JSON 数据类型, 只有 Tag 可以是 JSON 格式 |
| 16 | VARCHAR | 自定义 | BINARY 类型的别名 |
| 17 | GEOMETRY | 自定义 | 几何类型 |
:::note
- 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB注意每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)。
- 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB注意每个 BINARY/NCHAR/GEOMETRY 类型的列还会额外占用 2 个字节的存储位置)。
- 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。
- BINARY 类型理论上最长可以有 16,374从 3.0.5.0 版本开始,数据列为 65,517标签列为 16,382 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`
- GEOMETRY 类型数据列为最大长度为 65,517 字节,标签列最大长度为 16,382 字节。支持 2D 的 POINT、LINESTRING 和 POLYGON 子类型数据。长度计算方式如下表所示:
| # | **语法** | **最小长度** | **最大长度** | **每组坐标长度增长** |
|---|--------------------------------------|----------|------------|--------------|
| 1 | POINT(1.0 1.0) | 21 | 21 | 无 |
| 2 | LINESTRING(1.0 1.0, 2.0 2.0) | 9+2*16 | 9+4094*16 | +16 |
| 3 | POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0)) | 13+3*16 | 13+4094*16 | +16 |
- SQL 语句中的数值类型将依据是否存在小数点或使用科学计数法表示来判断数值类型是否为整型或者浮点型因此在使用时要注意相应类型越界的情况。例如9999999999999999999 会认为超过长整型的上边界而溢出,而 9999999999999999999.0 会被认为是有效的浮点数。
:::

View File

@ -43,9 +43,9 @@ table_option: {
1. 表的第一个字段必须是 TIMESTAMP并且系统自动将其设为主键
2. 表名最大长度为 192
3. 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB;(注意:每个 BINARY/NCHAR 类型的列还会额外占用 2 个字节的存储位置)
3. 表的每行长度不能超过 48KB从 3.0.5.0 版本开始为 64KB;(注意:每个 BINARY/NCHAR/GEOMETRY 类型的列还会额外占用 2 个字节的存储位置)
4. 子表名只能由字母、数字和下划线组成,且不能以数字开头,不区分大小写
5. 使用数据类型 binary 或 nchar需指定其最长的字节数如 binary(20),表示 20 字节;
5. 使用数据类型 BINARY/NCHAR/GEOMETRY需指定其最长的字节数如 BINARY(20),表示 20 字节;
6. 为了兼容支持更多形式的表名TDengine 引入新的转义符 "\`",可以让表名与关键词不冲突,同时不受限于上述表名称合法性约束检查。但是同样具有长度限制要求。使用转义字符以后,不再对转义字符中的内容进行大小写统一。
例如:\`aBc\` 和 \`abc\` 是不同的表名,但是 abc 和 aBc 是相同的表名。
需要注意的是转义字符中的内容必须是可打印字符。

View File

@ -39,7 +39,7 @@ TDengine 支持 `UNION ALL` 和 `UNION` 操作符。UNION ALL 将查询返回的
| 3 | \>, < | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 |
| 4 | \>=, <= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 |
| 5 | IS [NOT] NULL | 所有类型 | 是否为空值 |
| 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 闭区间比较 |
| 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB、JSON 和 GEOMETRY 外的所有类型 | 闭区间比较 |
| 7 | IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值相等 |
| 8 | LIKE | BINARY、NCHAR 和 VARCHAR | 通配符匹配 |
| 9 | MATCH, NMATCH | BINARY、NCHAR 和 VARCHAR | 正则表达式匹配 |

View File

@ -18,6 +18,7 @@ description: "TDengine 3.0 版本的语法变更说明"
| 8 | 混合运算 | 增强 | 查询中的混合运算标量运算和矢量运算混合全面增强SELECT的各个子句均全面支持符合语法语义的混合运算。
| 9 | 标签运算 | 新增 |在查询中,标签列可以像普通列一样参与各种运算,用于各种子句。
| 10 | 时间线子句和时间函数用于超级表查询 | 增强 |没有PARTITION BY时超级表的数据会被合并成一条时间线。
| 11 | GEOMETRY | 新增 | 几何类型。
## SQL 语句变更

View File

@ -184,7 +184,7 @@ taos -C
| 属性 | 说明 |
| -------- | ------------------------ |
| 适用范围 | 仅服务端适用 |
| 适用范围 | 客户端和服务端都适用 |
| 含义 | 是否上传 telemetry |
| 取值范围 | 0,1 0: 不上传1上传 |
| 缺省值 | 1 |
@ -193,7 +193,7 @@ taos -C
| 属性 | 说明 |
| -------- | ------------------------ |
| 适用范围 | 仅服务端适用 |
| 适用范围 | 客户端和服务端都适用 |
| 含义 | 是否上传 crash 信息 |
| 取值范围 | 0,1 0: 不上传1上传 |
| 缺省值 | 1 |

View File

@ -1495,6 +1495,7 @@ int32_t tDeserializeSShowVariablesReq(void* buf, int32_t bufLen, SShowVariablesR
typedef struct {
char name[TSDB_CONFIG_OPTION_LEN + 1];
char value[TSDB_CONFIG_VALUE_LEN + 1];
char scope[TSDB_CONFIG_SCOPE_LEN + 1];
} SVariablesInfo;
typedef struct {

View File

@ -36,9 +36,10 @@ extern "C" {
#define SHOW_CREATE_TB_RESULT_FIELD1_LEN (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE)
#define SHOW_CREATE_TB_RESULT_FIELD2_LEN (TSDB_MAX_ALLOWED_SQL_LEN * 3)
#define SHOW_LOCAL_VARIABLES_RESULT_COLS 2
#define SHOW_LOCAL_VARIABLES_RESULT_COLS 3
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_ALIVE_RESULT_COLS 1

View File

@ -69,6 +69,13 @@ void tfsUpdateSize(STfs *pTfs);
*/
SDiskSize tfsGetSize(STfs *pTfs);
/**
* @brief Get the number of disks at level of multi-tier storage.
*
* @param pTfs
* @return int32_t
*/
int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level);
/**
* @brief Get level of multi-tier storage.
*
@ -123,6 +130,15 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname);
*/
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Recursive make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecur(STfs *pTfs, const char *rname);
/**
* @brief Recursive create directories in tfs.
*
@ -160,7 +176,17 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname);
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname);
int32_t tfsRename(STfs *pTfs, int32_t diskPrimary, const char *orname, const char *nrname);
/**
* @brief Search fname in level of tfs
*
* @param pTfs The fs object.
* @param level The level to search on
* @param fname The relative file name to be searched
* @param int32_t diskId for successs, -1 for failure
*/
int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname);
/**
* @brief Init file object in tfs.

View File

@ -416,7 +416,7 @@ int32_t* taosGetErrno();
// #define TSDB_CODE_VND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0501) // 2.x
// #define TSDB_CODE_VND_ACTION_NEED_REPROCESS. TAOS_DEF_ERROR_CODE(0, 0x0502) // 2.x
#define TSDB_CODE_VND_INVALID_VGROUP_ID TAOS_DEF_ERROR_CODE(0, 0x0503)
// #define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504) // 2.x
#define TSDB_CODE_VND_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x0504)
// #define TSDB_CODE_VND_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x0505) // 2.x
// #define TSDB_CODE_VND_NO_DISK_PERMISSIONS TAOS_DEF_ERROR_CODE(0, 0x0506) // 2.x
// #define TSDB_CODE_VND_NO_SUCH_FILE_OR_DIR TAOS_DEF_ERROR_CODE(0, 0x0507) // 2.x

View File

@ -50,10 +50,16 @@ typedef enum {
CFG_DTYPE_TIMEZONE
} ECfgDataType;
typedef enum {
CFG_SCOPE_SERVER,
CFG_SCOPE_CLIENT,
CFG_SCOPE_BOTH
} ECfgScopeType;
typedef struct SConfigItem {
ECfgSrcType stype;
ECfgDataType dtype;
bool tsc;
int8_t scope;
char *name;
union {
bool bval;
@ -92,20 +98,21 @@ int32_t cfgGetSize(SConfig *pCfg);
SConfigItem *cfgGetItem(SConfig *pCfg, const char *name);
int32_t cfgSetItem(SConfig *pCfg, const char *name, const char *value, ECfgSrcType stype);
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, bool tsc);
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, bool tsc);
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, bool tsc);
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, double minval, double maxval, bool tsc);
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, bool tsc);
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, bool tsc);
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal);
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal);
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal);
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope);
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope);
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope);
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, double minval, double maxval, int8_t scope);
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope);
const char *cfgStypeStr(ECfgSrcType type);
const char *cfgDtypeStr(ECfgDataType type);
void cfgDumpItemValue(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *pLen);
void cfgDumpItemScope(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *pLen);
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump);

View File

@ -492,6 +492,7 @@ enum {
#define TSDB_CONFIG_OPTION_LEN 32
#define TSDB_CONFIG_VALUE_LEN 64
#define TSDB_CONFIG_SCOPE_LEN 8
#define TSDB_CONFIG_NUMBER 8
#define QUERY_ID_SIZE 20

View File

@ -46,9 +46,10 @@ enum {
RES_TYPE__TMQ_METADATA,
};
#define SHOW_VARIABLES_RESULT_COLS 2
#define SHOW_VARIABLES_RESULT_COLS 3
#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)

View File

@ -428,13 +428,16 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
int32_t numOfCfg = taosArrayGetSize(pVars);
blockDataEnsureCapacity(pBlock, numOfCfg);
@ -450,6 +453,11 @@ static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false);
}
pBlock->info.rows = numOfCfg;

View File

@ -271,6 +271,7 @@ static const SSysDbTableSchema variablesSchema[] = {
{.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "name", .bytes = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "value", .bytes = TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "scope", .bytes = TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
};
static const SSysDbTableSchema topicSchema[] = {

View File

@ -299,38 +299,38 @@ static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *input
}
int32_t taosAddClientLogCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "configDir", configDir, 1) != 0) return -1;
if (cfgAddDir(pCfg, "scriptDir", configDir, 1) != 0) return -1;
if (cfgAddDir(pCfg, "logDir", tsLogDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalLogDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfLogLines", tsNumOfLogLines, 1000, 2000000000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "asyncLog", tsAsyncLog, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "logKeepDays", 0, -365000, 365000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "tmrDebugFlag", tmrDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "uDebugFlag", uDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcDebugFlag", rpcDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, 1) != 0) return -1;
if (cfgAddDir(pCfg, "configDir", configDir, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddDir(pCfg, "scriptDir", configDir, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddDir(pCfg, "logDir", tsLogDir, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalLogDirGB", 1.0f, 0.001f, 10000000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfLogLines", tsNumOfLogLines, 1000, 2000000000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "asyncLog", tsAsyncLog, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "logKeepDays", 0, -365000, 365000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "debugFlag", 0, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "simDebugFlag", 143, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "tmrDebugFlag", tmrDebugFlag, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "uDebugFlag", uDebugFlag, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcDebugFlag", rpcDebugFlag, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "jniDebugFlag", jniDebugFlag, 0, 255, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "cDebugFlag", cDebugFlag, 0, 255, CFG_SCOPE_CLIENT) != 0) return -1;
return 0;
}
static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "vDebugFlag", vDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mDebugFlag", mDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "wDebugFlag", wDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "dDebugFlag", dDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "vDebugFlag", vDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "mDebugFlag", mDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "wDebugFlag", wDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "tdbDebugFlag", tdbDebugFlag, 0, 255, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "metaDebugFlag", metaDebugFlag, 0, 255, 0) != CFG_SCOPE_SERVER) return -1;
return 0;
}
@ -341,53 +341,52 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
strcpy(defaultFqdn, "localhost");
}
if (cfgAddString(pCfg, "firstEp", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "secondEp", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, 1) != 0) return -1;
if (cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, false) != 0) return -1;
if (cfgAddBool(pCfg, "enableScience", tsEnableScience, false) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
// if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
// if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, 1) != 0)
return -1;
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, true) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", "", true) != 0) return -1;
if (cfgAddString(pCfg, "firstEp", "", CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "secondEp", "", CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "fqdn", defaultFqdn, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepColumnName, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, CFG_SCOPE_CLIENT) != 0) return -1;
// if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, CFG_SCOPE_CLIENT) != 0) return -1;
// if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "maxInsertBatchRows", tsMaxInsertBatchRows, 1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "queryMaxConcurrentTables", tsQueryMaxConcurrentTables, INT64_MIN, INT64_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "metaCacheMaxSize", tsMetaCacheMaxSize, -1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "slowLogThreshold", tsSlowLogThreshold, 0, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddString(pCfg, "slowLogScope", "", CFG_SCOPE_CLIENT) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH) != 0) return -1;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, CFG_SCOPE_BOTH) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
if (tsNumOfTaskQueueThreads >= 10) {
tsNumOfTaskQueueThreads = 10;
}
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT) != 0) return -1;
return 0;
}
@ -395,92 +394,92 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
static int32_t taosAddSystemCfg(SConfig *pCfg) {
SysNameInfo info = taosGetSysNameInfo();
if (cfgAddTimezone(pCfg, "timezone", tsTimezoneStr) != 0) return -1;
if (cfgAddLocale(pCfg, "locale", tsLocale) != 0) return -1;
if (cfgAddCharset(pCfg, "charset", tsCharset) != 0) return -1;
if (cfgAddBool(pCfg, "assert", 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "enableCoreFile", 1, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, 1) != 0) return -1;
if (cfgAddTimezone(pCfg, "timezone", tsTimezoneStr, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddLocale(pCfg, "locale", tsLocale, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddCharset(pCfg, "charset", tsCharset, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "assert", 1, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "enableCoreFile", 1, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "SSE42", tsSSE42Enable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "AVX", tsAVXEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "FMA", tsFMAEnable, 0) != 0) return -1;
if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, 0) != 0) return -1;
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, 0) != 0) return -1;
if (cfgAddBool(pCfg, "SSE42", tsSSE42Enable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "AVX", tsAVXEnable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "AVX2", tsAVX2Enable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "FMA", tsFMAEnable, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "SIMD-builtins", tsSIMDBuiltins, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "tagFilterCache", tsTagFilterCache, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt64(pCfg, "openMax", tsOpenMax, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1;
#if !defined(_ALPINE)
if (cfgAddInt64(pCfg, "streamMax", tsStreamMax, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt64(pCfg, "streamMax", tsStreamMax, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1;
#endif
if (cfgAddInt32(pCfg, "pageSizeKB", tsPageSizeKB, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddInt64(pCfg, "totalMemoryKB", tsTotalMemoryKB, 0, INT64_MAX, 1) != 0) return -1;
if (cfgAddString(pCfg, "os sysname", info.sysname, 1) != 0) return -1;
if (cfgAddString(pCfg, "os nodename", info.nodename, 1) != 0) return -1;
if (cfgAddString(pCfg, "os release", info.release, 1) != 0) return -1;
if (cfgAddString(pCfg, "os version", info.version, 1) != 0) return -1;
if (cfgAddString(pCfg, "os machine", info.machine, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "pageSizeKB", tsPageSizeKB, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt64(pCfg, "totalMemoryKB", tsTotalMemoryKB, 0, INT64_MAX, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "os sysname", info.sysname, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "os nodename", info.nodename, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "os release", info.release, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "os version", info.version, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "os machine", info.machine, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "version", version, 1) != 0) return -1;
if (cfgAddString(pCfg, "compatible_version", compatible_version, 1) != 0) return -1;
if (cfgAddString(pCfg, "gitinfo", gitinfo, 1) != 0) return -1;
if (cfgAddString(pCfg, "buildinfo", buildinfo, 1) != 0) return -1;
if (cfgAddString(pCfg, "version", version, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "compatible_version", compatible_version, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "gitinfo", gitinfo, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "buildinfo", buildinfo, CFG_SCOPE_BOTH) != 0) return -1;
return 0;
}
static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "dataDir", tsDataDir, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, 0) != 0) return -1;
if (cfgAddDir(pCfg, "dataDir", tsDataDir, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalDataDirGB", 2.0f, 0.001f, 10000000, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfSupportVnodes = tsNumOfCores * 2;
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxShellConns", tsMaxShellConns, 10, 50000000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, CFG_SCOPE_CLIENT) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 2, TSDB_MAX_RPC_THREADS);
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfRpcThreads", tsNumOfRpcThreads, 1, 1024, CFG_SCOPE_BOTH) != 0) return -1;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, CFG_SCOPE_BOTH) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfVnodeQueryThreads = tsNumOfCores * 2;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "ratioOfVnodeStreamThreads", tsRatioOfVnodeStreamThreads, 0.01, 100, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfVnodeRsmaThreads = tsNumOfCores;
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfQnodeQueryThreads = tsNumOfCores * 2;
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
@ -488,67 +487,67 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeStreamThreads = tsNumOfCores / 4;
tsNumOfSnodeStreamThreads = TRANGE(tsNumOfSnodeStreamThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, 0) != 0)
if (cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_BOTH) != 0)
return -1;
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "skipGrant", tsMndSkipGrant, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "monitorComp", tsMonitorComp, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, 0) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, 0) != 0) return -1;
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, CFG_SCOPE_BOTH) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, 0) != 0)
if (cfgAddInt64(pCfg, "walFsyncDataSizeLimit", tsWalFsyncDataSizeLimit, 100 * 1024 * 1024, INT64_MAX, CFG_SCOPE_SERVER) != 0)
return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
if (cfgAddBool(pCfg, "udf", tsStartUdfd, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, 0) != 0) return -1;
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "keepTimeOffset", tsKeepTimeOffset, 0, 23, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, 0) != 0) return -1;
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "keepTimeOffset", tsKeepTimeOffset, 0, 23, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER) != 0) return -1;
GRANT_CFG_ADD;
return 0;

View File

@ -3484,12 +3484,14 @@ int32_t tDeserializeSShowVariablesReq(void *buf, int32_t bufLen, SShowVariablesR
int32_t tEncodeSVariablesInfo(SEncoder *pEncoder, SVariablesInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->value) < 0) return -1;
if (tEncodeCStr(pEncoder, pInfo->scope) < 0) return -1;
return 0;
}
int32_t tDecodeSVariablesInfo(SDecoder *pDecoder, SVariablesInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pInfo->value) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pInfo->scope) < 0) return -1;
return 0;
}

View File

@ -265,6 +265,12 @@ int32_t dmAppendVariablesToBlock(SSDataBlock *pBlock, int32_t dnodeId) {
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen);
varDataSetLen(scope, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false);
numOfRows++;
}

View File

@ -46,6 +46,7 @@ typedef struct {
int32_t vgId;
int32_t vgVersion;
int8_t dropped;
int32_t diskPrimary;
int32_t toVgId;
char path[PATH_MAX + 20];
} SWrapperCfg;
@ -56,6 +57,7 @@ typedef struct {
int32_t refCount;
int8_t dropped;
int8_t disable;
int32_t diskPrimary;
int32_t toVgId;
char *path;
SVnode *pImpl;
@ -81,6 +83,7 @@ typedef struct {
} SVnodeThread;
// vmInt.c
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId);
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId);
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);

View File

@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "diskPrimary", pCfg->diskPrimary, code);
if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
if (code < 0) goto _OVER;
@ -167,6 +169,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "diskPrimary", pVnode->diskPrimary) < 0) return -1;
if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1;
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
}

View File

@ -263,16 +263,19 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0;
}
wrapperCfg.diskPrimary = vmAllocPrimaryDisk(pMgmt, vnodeCfg.vgId);
int32_t diskPrimary = wrapperCfg.diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
if (vnodeCreate(path, &vnodeCfg, diskPrimary, pMgmt->pTfs) < 0) {
tFreeSCreateVnodeReq(&req);
dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr());
code = terrno;
goto _OVER;
}
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode since %s", req.vgId, terrstr());
code = terrno;
@ -403,21 +406,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.dropped = pVnode->dropped,
.vgId = pVnode->vgId,
.vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode, false);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlterReplica(path, &req, pMgmt->pTfs) < 0) {
if (vnodeAlterReplica(path, &req, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1;
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;
@ -490,6 +495,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.dropped = pVnode->dropped,
.vgId = dstVgId,
.vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
@ -503,19 +509,20 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, close vnode", srcVgId);
vmCloseVnode(pMgmt, pVnode, true);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char srcPath[TSDB_FILENAME_LEN] = {0};
char dstPath[TSDB_FILENAME_LEN] = {0};
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
dInfo("vgId:%d, alter vnode hashrange at %s", srcVgId, srcPath);
if (vnodeAlterHashRange(srcPath, dstPath, &req, pMgmt->pTfs) < 0) {
if (vnodeAlterHashRange(srcPath, dstPath, &req, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode hashrange since %s", srcVgId, terrstr());
return -1;
}
dInfo("vgId:%d, open vnode", dstVgId);
SVnode *pImpl = vnodeOpen(dstPath, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(dstPath, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", dstVgId, dstPath, terrstr());
return -1;
@ -602,21 +609,23 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
.dropped = pVnode->dropped,
.vgId = pVnode->vgId,
.vgVersion = pVnode->vgVersion,
.diskPrimary = pVnode->diskPrimary,
};
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
vmCloseVnode(pMgmt, pVnode, false);
int32_t diskPrimary = wrapperCfg.diskPrimary;
char path[TSDB_FILENAME_LEN] = {0};
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vgId);
dInfo("vgId:%d, start to alter vnode replica at %s", vgId, path);
if (vnodeAlterReplica(path, &alterReq, pMgmt->pTfs) < 0) {
if (vnodeAlterReplica(path, &alterReq, diskPrimary, pMgmt->pTfs) < 0) {
dError("vgId:%d, failed to alter vnode at %s since %s", vgId, path, terrstr());
return -1;
}
dInfo("vgId:%d, begin to open vnode", vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode at %s since %s", vgId, path, terrstr());
return -1;

View File

@ -15,8 +15,64 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;
int32_t diskId = 0;
if (!pTfs) {
return diskId;
}
// search fs
char vnodePath[TSDB_FILENAME_LEN] = {0};
snprintf(vnodePath, TSDB_FILENAME_LEN - 1, "vnode%svnode%d", TD_DIRSEP, vgId);
char fname[TSDB_FILENAME_LEN] = {0};
char fnameTmp[TSDB_FILENAME_LEN] = {0};
snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME);
snprintf(fnameTmp, TSDB_FILENAME_LEN - 1, "%s%s%s", vnodePath, TD_DIRSEP, VND_INFO_FNAME_TMP);
diskId = tfsSearch(pTfs, 0, fname);
if (diskId >= 0) {
return diskId;
}
diskId = tfsSearch(pTfs, 0, fnameTmp);
if (diskId >= 0) {
return diskId;
}
// alloc
int32_t disks[TFS_MAX_DISKS_PER_TIER] = {0};
int32_t numOfVnodes = 0;
SVnodeObj **ppVnodes = vmGetVnodeListFromHash(pMgmt, &numOfVnodes);
for (int32_t v = 0; v < numOfVnodes; v++) {
SVnodeObj *pVnode = ppVnodes[v];
disks[pVnode->diskPrimary] += 1;
}
int32_t minVal = INT_MAX;
int32_t ndisk = tfsGetDisksAtLevel(pTfs, 0);
diskId = 0;
for (int32_t id = 0; id < ndisk; id++) {
if (minVal > disks[id]) {
minVal = disks[id];
diskId = id;
}
}
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (ppVnodes == NULL || ppVnodes[i] == NULL) continue;
vmReleaseVnode(pMgmt, ppVnodes[i]);
}
if (ppVnodes != NULL) {
taosMemoryFree(ppVnodes);
}
dInfo("vgId:%d, alloc disk:%d of level 0. ndisk:%d, vnodes: %d", vgId, diskId, ndisk, numOfVnodes);
return diskId;
}
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL;
@ -52,6 +108,7 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->vgId = pCfg->vgId;
pVnode->vgVersion = pCfg->vgVersion;
pVnode->diskPrimary = pCfg->diskPrimary;
pVnode->refCount = 0;
pVnode->dropped = 0;
pVnode->path = taosStrdup(pCfg->path);
@ -169,7 +226,8 @@ static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs);
int32_t diskPrimary = pCfg->diskPrimary;
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs);
if (vgId <= 0) {
dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
return -1;
@ -205,11 +263,12 @@ static void *vmOpenVnodeInThread(void *param) {
pThread->updateVnodesList = true;
}
int32_t diskPrimary = pCfg->diskPrimary;
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
SVnode *pImpl = vnodeOpen(path, diskPrimary, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
dError("vgId:%d, failed to open vnode by thread:%d since %s", pCfg->vgId, pThread->threadIndex, terrstr());
pThread->failed++;
continue;
}
@ -296,6 +355,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
terrno = TSDB_CODE_VND_INIT_FAILED;
return -1;
}
@ -518,7 +578,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
tmsgReportStartup("vnode-worker", "initialized");
if (vmOpenVnodes(pMgmt) != 0) {
dError("failed to open vnode since %s", terrstr());
dError("failed to open all vnodes since %s", terrstr());
goto _OVER;
}
tmsgReportStartup("vnode-vnodes", "initialized");

View File

@ -785,18 +785,22 @@ static int32_t mndProcessShowVariablesReq(SRpcMsg *pReq) {
strcpy(info.name, "statusInterval");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%d", tsStatusInterval);
strcpy(info.scope, "server");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "timezone");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsTimezoneStr);
strcpy(info.scope, "both");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "locale");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsLocale);
strcpy(info.scope, "both");
taosArrayPush(rsp.variables, &info);
strcpy(info.name, "charset");
snprintf(info.value, TSDB_CONFIG_VALUE_LEN, "%s", tsCharset);
strcpy(info.scope, "both");
taosArrayPush(rsp.variables, &info);
int32_t rspLen = tSerializeSShowVariablesRsp(NULL, 0, &rsp);

View File

@ -51,12 +51,14 @@ extern const SVnodeCfg vnodeCfgDefault;
int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs);
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
int32_t diskPrimary, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode);
void vnodeSyncCheckTimeout(SVnode *pVnode);

View File

@ -87,7 +87,7 @@ void vnodeBufPoolAddToFreeList(SVBufPool* pPool);
int32_t vnodeBufPoolRecycle(SVBufPool* pPool);
// vnodeOpen.c
int32_t vnodeGetPrimaryDir(const char* relPath, STfs* pTfs, char* buf, size_t bufLen);
int32_t vnodeGetPrimaryDir(const char* relPath, int32_t diskPrimary, STfs* pTfs, char* buf, size_t bufLen);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);

View File

@ -93,6 +93,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
// vnd.h
typedef int32_t (*_query_reseek_func_t)(void* pQHandle);
@ -385,6 +386,7 @@ struct SVnode {
SVState state;
SVStatis statis;
STfs* pTfs;
int32_t diskPrimary;
SMsgCb msgCb;
// Buffer Pool

View File

@ -41,7 +41,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
*ppMeta = NULL;
// create handle
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VNODE_META_DIR);

View File

@ -379,7 +379,7 @@ _out:
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ttlMgrWLock(pTtlMgr);
metaInfo("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
metaDebug("%s, ttl mgr flush start. dirty uids:%d", pTtlMgr->logPrefix, taosHashGetSize(pTtlMgr->pDirtyUids));
int ret = -1;
@ -433,7 +433,7 @@ int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
_out:
ttlMgrULock(pTtlMgr);
metaInfo("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
metaDebug("%s, ttl mgr flush end.", pTtlMgr->logPrefix);
return ret;
}

View File

@ -26,7 +26,7 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
int32_t offset = 0;
// vnode
vnodeGetPrimaryDir(pVnode->path, pTfs, outputName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pTfs, outputName, TSDB_FILENAME_LEN);
offset = strlen(outputName);
// rsma

View File

@ -59,7 +59,7 @@ typedef struct {
static void tsdbGetRocksPath(STsdb *pTsdb, char *path) {
SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, path, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
int32_t offset = strlen(path);
snprintf(path + offset, TSDB_FILENAME_LEN - offset - 1, "%scache.rdb", TD_DIRSEP);

View File

@ -276,14 +276,14 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
// CURRENT
if (current) {
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current, TSDB_FILENAME_LEN);
offset = strlen(current);
snprintf(current + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT", TD_DIRSEP);
}
// CURRENT.t
if (current_t) {
vnodeGetPrimaryDir(pTsdb->path, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, current_t, TSDB_FILENAME_LEN);
offset = strlen(current_t);
snprintf(current_t + offset, TSDB_FILENAME_LEN - offset - 1, "%sCURRENT.t", TD_DIRSEP);
}

View File

@ -284,8 +284,9 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) {
// SDelFile ===============================================
void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]) {
int32_t offset = 0;
SVnode *pVnode = pTsdb->pVnode;
vnodeGetPrimaryDir(pTsdb->path, pTsdb->pVnode->pTfs, fname, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pTsdb->path, pVnode->diskPrimary, pVnode->pTfs, fname, TSDB_FILENAME_LEN);
offset = strlen(fname);
snprintf((char *)fname + offset, TSDB_FILENAME_LEN - offset - 1, "%sv%dver%" PRId64 ".del", TD_DIRSEP,
TD_VID(pTsdb->pVnode), pFile->commitID);

View File

@ -16,8 +16,6 @@
#include "vnd.h"
#include "vnodeInt.h"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeCommitImpl(SCommitInfo *pInfo);
@ -290,7 +288,7 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo->txn = metaGetTxn(pVnode->pMeta);
// save info
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vDebug("vgId:%d, save config while prepare commit", TD_VID(pVnode));
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
@ -428,7 +426,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
return -1;
}
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
syncBeginSnapshot(pVnode->sync, pInfo->info.state.committed);
@ -492,7 +490,7 @@ bool vnodeShouldRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);
@ -503,7 +501,7 @@ void vnodeRollback(SVnode *pVnode) {
char tFName[TSDB_FILENAME_LEN] = {0};
int32_t offset = 0;
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, tFName, TSDB_FILENAME_LEN);
offset = strlen(tFName);
snprintf(tFName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME_TMP);

View File

@ -15,9 +15,11 @@
#include "vnd.h"
int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bufLen) {
int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) {
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, relPath);
SDiskID diskId = {0};
diskId.id = diskPrimary;
snprintf(buf, bufLen - 1, "%s%s%s", tfsGetDiskPath(pTfs, diskId), TD_DIRSEP, relPath);
} else {
snprintf(buf, bufLen - 1, "%s", relPath);
}
@ -25,7 +27,15 @@ int32_t vnodeGetPrimaryDir(const char *relPath, STfs *pTfs, char *buf, size_t bu
return 0;
}
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
static int32_t vnodeMkDir(STfs *pTfs, const char *path) {
if (pTfs) {
return tfsMkdirRecur(pTfs, path);
} else {
return taosMkDir(path);
}
}
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
@ -36,10 +46,11 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
}
// create vnode env
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
if (taosMkDir(dir)) {
if (vnodeMkDir(pTfs, path)) {
vError("vgId:%d, failed to prepare vnode dir since %s, path: %s", pCfg->vgId, strerror(errno), path);
return TAOS_SYSTEM_ERROR(errno);
}
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (pCfg) {
info.config = *pCfg;
@ -60,12 +71,12 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
return 0;
}
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
@ -133,7 +144,8 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
return strlen(tmp);
}
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs) {
int32_t ret = 0;
char oldRname[TSDB_FILENAME_LEN] = {0};
@ -164,7 +176,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);
vInfo("vgId:%d, rename file from %s to %s", dstVgId, tsdbFile->rname, newRname);
ret = tfsRename(pTfs, tsdbFile->rname, newRname);
ret = tfsRename(pTfs, diskPrimary, tsdbFile->rname, newRname);
if (ret != 0) {
vError("vgId:%d, failed to rename file from %s to %s since %s", dstVgId, tsdbFile->rname, newRname, terrstr());
tfsClosedir(tsdbDir);
@ -176,19 +188,20 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
tfsClosedir(tsdbDir);
vInfo("vgId:%d, rename dir from %s to %s", dstVgId, srcPath, dstPath);
ret = tfsRename(pTfs, srcPath, dstPath);
ret = tfsRename(pTfs, diskPrimary, srcPath, dstPath);
if (ret != 0) {
vError("vgId:%d, failed to rename dir from %s to %s since %s", dstVgId, srcPath, dstPath, terrstr());
}
return ret;
}
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs) {
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq,
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
int32_t ret = 0;
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
@ -232,7 +245,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
}
vInfo("vgId:%d, rename %s to %s", pReq->dstVgId, srcPath, dstPath);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, pTfs);
ret = vnodeRenameVgroupId(srcPath, dstPath, pReq->srcVgId, pReq->dstVgId, diskPrimary, pTfs);
if (ret < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", pReq->dstVgId, srcPath, dstPath,
tstrerror(terrno));
@ -243,11 +256,12 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return 0;
}
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) {
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs) {
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(dstPath, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(dstPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) == 0) {
if (info.config.vgId != dstVgId) {
vError("vgId:%d, unexpected vnode config.vgId:%d", dstVgId, info.config.vgId);
@ -256,7 +270,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return dstVgId;
}
vnodeGetPrimaryDir(srcPath, pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(srcPath, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &info) < 0) {
vError("vgId:%d, failed to read vnode config from %s since %s", srcVgId, srcPath, tstrerror(terrno));
return -1;
@ -271,7 +285,7 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
}
vInfo("vgId:%d, rename %s to %s", dstVgId, srcPath, dstPath);
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs) < 0) {
if (vnodeRenameVgroupId(srcPath, dstPath, srcVgId, dstVgId, diskPrimary, pTfs) < 0) {
vError("vgId:%d, failed to rename vnode from %s to %s since %s", dstVgId, srcPath, dstPath, tstrerror(terrno));
return -1;
}
@ -284,14 +298,31 @@ void vnodeDestroy(const char *path, STfs *pTfs) {
tfsRmdir(pTfs, path);
}
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
int32_t ndisk = 1;
if (pTfs) {
ndisk = tfsGetDisksAtLevel(pTfs, 0);
}
if (diskPrimary < 0 || diskPrimary >= ndisk) {
vError("disk:%d is unavailable from the %d disks mounted at level 0", diskPrimary, ndisk);
terrno = TSDB_CODE_FS_INVLD_CFG;
return -1;
}
return 0;
}
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN] = {0};
char tdir[TSDB_FILENAME_LEN * 2] = {0};
int32_t ret = 0;
vnodeGetPrimaryDir(path, pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeCheckDisk(diskPrimary, pTfs)) {
vError("failed to open vnode from %s since %s. diskPrimary:%d", path, terrstr(), diskPrimary);
return NULL;
}
vnodeGetPrimaryDir(path, diskPrimary, pTfs, dir, TSDB_FILENAME_LEN);
info.config = vnodeCfgDefault;
@ -334,6 +365,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->state.applied = info.state.committed;
pVnode->state.applyTerm = info.state.commitTerm;
pVnode->pTfs = pTfs;
pVnode->diskPrimary = diskPrimary;
pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL);
pVnode->blocked = false;

View File

@ -35,7 +35,7 @@ static int32_t vnodePrepareRentention(SVnode *pVnode, SRetentionInfo *pInfo) {
pInfo->commitID = ++pVnode->state.commitID;
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
if (vnodeLoadInfo(dir, &pInfo->info) < 0) {
code = terrno;
@ -60,7 +60,7 @@ static int32_t vnodeRetentionTask(void *param) {
SVnode *pVnode = pInfo->pVnode;
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
// save info
pInfo->info.state.commitID = pInfo->commitID;

View File

@ -86,6 +86,7 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) {
int32_t code = 0;
SVnode *pVnode = pReader->pVnode;
// CONFIG ==============
// FIXME: if commit multiple times and the config changed?
@ -93,7 +94,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
char fName[TSDB_FILENAME_LEN];
int32_t offset = 0;
vnodeGetPrimaryDir(pReader->pVnode->path, pReader->pVnode->pTfs, fName, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, fName, TSDB_FILENAME_LEN);
offset = strlen(fName);
snprintf(fName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s", TD_DIRSEP, VND_INFO_FNAME);
@ -343,7 +344,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
.applyTerm = pWriter->info.state.commitTerm};
pVnode->statis = pWriter->info.statis;
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pVnode->path, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeCommitInfo(dir);
} else {
@ -381,7 +382,7 @@ _exit:
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0;
SVnode *pVnode = pWriter->pVnode;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;
// decode info
@ -395,10 +396,9 @@ static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_
// modify info as needed
char dir[TSDB_FILENAME_LEN] = {0};
vnodeGetPrimaryDir(pWriter->pVnode->path, pWriter->pVnode->pTfs, dir, TSDB_FILENAME_LEN);
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, dir, TSDB_FILENAME_LEN);
SVnodeStats vndStats = pWriter->info.config.vndStats;
SVnode *pVnode = pWriter->pVnode;
pWriter->info.config = pVnode->config;
pWriter->info.config.vndStats = vndStats;
vDebug("vgId:%d, save config while write snapshot", pWriter->pVnode->config.vgId);

View File

@ -773,12 +773,6 @@ int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
int32_t code = 0;
if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
*inCache = true;
pRes->pRawRes->pass = true;
ctgDebug("sysdb %s, pass", pReq->tbName.dbname);
return TSDB_CODE_SUCCESS;
}
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, pReq->user, strlen(pReq->user));
if (NULL == pUser) {

View File

@ -1589,6 +1589,12 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
return TSDB_CODE_SUCCESS;
}
if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
pRes->pass = true;
ctgDebug("sysdb %s, pass", pReq->tbName.dbname);
return TSDB_CODE_SUCCESS;
}
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&pReq->tbName, dbFName);

View File

@ -802,15 +802,19 @@ static int32_t buildLocalVariablesResultDataBlock(SSDataBlock** pOutput) {
pBlock->pDataBlock = taosArrayInit(SHOW_LOCAL_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD1_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = SHOW_LOCAL_VARIABLES_RESULT_FIELD3_LEN;
taosArrayPush(pBlock->pDataBlock, &infoData);
*pOutput = pBlock;
return TSDB_CODE_SUCCESS;
}
@ -823,6 +827,7 @@ int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
SConfigItem* pItem = taosArrayGet(tsCfg->array, i);
GRANT_CFG_SKIP;
char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(name, pItem->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
@ -835,6 +840,12 @@ int32_t setLocalVariablesResultIntoDataBlock(SSDataBlock* pBlock) {
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, value, false);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen);
varDataSetLen(scope, valueLen);
pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
colDataSetVal(pColInfo, i, scope, false);
numOfRows++;
}

View File

@ -366,7 +366,6 @@ typedef struct SStreamScanInfo {
SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes;

View File

@ -1820,9 +1820,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN2) {
if (pInfo->blockRecoverContiCnt > 100) {
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
pInfo->blockRecoverContiCnt = 0;
if (isTaskKilled(pTaskInfo)) {
return NULL;
}
@ -1867,7 +1865,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pRecoverRes = doTableScan(pInfo->pTableScanOp);
if (pInfo->pRecoverRes != NULL) {
pInfo->blockRecoverContiCnt++;
calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {

View File

@ -2914,6 +2914,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pWinBlock);
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted);
taosArrayDestroy(pInfo->historyWins);
@ -3008,14 +3009,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx,
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
}
if (pHandle) {
pSup->winRange = pHandle->winRange;
// temporary
if (pSup->winRange.ekey <= 0) {
pSup->winRange.ekey = INT64_MAX;
}
}
pSup->pSessionAPI = pApi;
return TSDB_CODE_SUCCESS;
@ -3063,11 +3056,12 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->pOutputBuf, &pAggSup->pSessionAPI->stateStore);
pCurWin->pOutputBuf = taosMemoryMalloc(size);
pCurWin->pOutputBuf = taosMemoryCalloc(1, size);
}
if (code == TSDB_CODE_SUCCESS) {
pCurWin->isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->sessionWin);
} else {
pCurWin->sessionWin.win.skey = startTs;
pCurWin->sessionWin.win.ekey = endTs;
@ -3198,7 +3192,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
}
static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
SSHashObj* pStDeleted) {
SSHashObj* pStDeleted, bool addGap) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
@ -3222,7 +3216,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) {
if (addGap) {
winDelta = pAggSup->gap;
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta);
@ -3240,6 +3234,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL;
return TSDB_CODE_SUCCESS;
}
@ -3253,8 +3248,13 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SResultRow* pResult = NULL;
int32_t rows = pSDataBlock->info.rows;
int32_t winRows = 0;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pAggSup->winRange.ekey <= 0) {
pAggSup->winRange.ekey = INT64_MAX;
}
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* startTsCols = (int64_t*)pStartTsCol->pData;
@ -3266,7 +3266,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < rows;) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++;
@ -3291,7 +3290,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted);
compactSessionWindow(pOperator, &winInfo, pStUpdated, pStDeleted, addGap);
saveSessionOutputBuf(pAggSup, &winInfo);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
@ -3455,7 +3454,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL);
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
saveResult(parentWin, pStUpdated);
} else {
break;
@ -3707,8 +3706,8 @@ void streamSessionReleaseState(SOperatorInfo* pOperator) {
}
void resetWinRange(STimeWindow* winRange) {
winRange->skey = INT16_MIN;
winRange->skey = INT16_MAX;
winRange->skey = INT64_MIN;
winRange->ekey = INT64_MAX;
}
void streamSessionReloadState(SOperatorInfo* pOperator) {
@ -3724,10 +3723,16 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pStUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pStUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) {
SResultWindowInfo winInfo = {0};
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
saveSessionOutputBuf(pAggSup, &winInfo);
saveResult(winInfo, pInfo->pStUpdated);
}
taosMemoryFree(pBuf);
@ -4019,6 +4024,7 @@ void destroyStreamStateOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes);
taosArrayDestroy(pInfo->historyWins);
tSimpleHashCleanup(pInfo->pSeUpdated);
tSimpleHashCleanup(pInfo->pSeDeleted);
taosMemoryFreeClear(param);
}
@ -4073,6 +4079,7 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS) {
pCurWin->winInfo.isOutput = true;
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
} else if (pKeyData) {
if (IS_VAR_DATA_TYPE(pAggSup->stateKeyType)) {
varDataCopy(pCurWin->pStateKey->pData, pKeyData);
@ -4145,8 +4152,13 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
TSKEY* tsCols = NULL;
SResultRow* pResult = NULL;
int32_t winRows = 0;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pAggSup->winRange.ekey <= 0) {
pAggSup->winRange.ekey = INT64_MAX;
}
if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
@ -4155,7 +4167,6 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
return;
}
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
int32_t rows = pSDataBlock->info.rows;
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
@ -4335,7 +4346,7 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, 1);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) {
@ -4344,11 +4355,10 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
doDeleteSessionWindow(pAggSup, &pNextWin->sessionWin);
taosMemoryFree(pNextWin->pOutputBuf);
saveSessionOutputBuf(pAggSup, pCurWin);
}
void streamStateReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
@ -4360,14 +4370,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeUpdated = tSimpleHashInit(64, hashFn);
}
for (int32_t i = 0; i < num; i++) {
SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0};
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted);
saveResult(curInfo.winInfo, pInfo->pStUpdated);
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
}
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
@ -4855,7 +4870,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &win, startPos, forwardRows, pExprSup);
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult);

View File

@ -7719,7 +7719,7 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema**
}
static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pSchema) {
*numOfCols = 2;
*numOfCols = 3;
*pSchema = taosMemoryCalloc((*numOfCols), sizeof(SSchema));
if (NULL == (*pSchema)) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -7733,6 +7733,10 @@ static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pS
(*pSchema)[1].bytes = TSDB_CONFIG_VALUE_LEN;
strcpy((*pSchema)[1].name, "value");
(*pSchema)[2].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[2].bytes = TSDB_CONFIG_SCOPE_LEN;
strcpy((*pSchema)[2].name, "scope");
return TSDB_CODE_SUCCESS;
}

View File

@ -1864,7 +1864,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
taosMemoryFreeClear(*pVal);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
taosMemoryFreeClear(*pVal);
@ -1880,7 +1879,6 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
}
@ -1938,14 +1936,12 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
@ -1961,7 +1957,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
}

View File

@ -113,6 +113,15 @@ SDiskSize tfsGetSize(STfs *pTfs) {
return size;
}
int32_t tfsGetDisksAtLevel(STfs *pTfs, int32_t level) {
if (level < 0 || level >= pTfs->nlevel) {
return 0;
}
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
return pTier->ndisk;
}
int32_t tfsGetLevel(STfs *pTfs) { return pTfs->nlevel; }
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
@ -272,6 +281,20 @@ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
return 0;
}
int32_t tfsMkdirRecur(STfs *pTfs, const char *rname) {
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
SDiskID did = {.id = id, .level = level};
if (tfsMkdirRecurAt(pTfs, rname, did) < 0) {
return -1;
}
}
}
return 0;
}
int32_t tfsMkdir(STfs *pTfs, const char *rname) {
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
@ -314,25 +337,60 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
return 0;
}
int32_t tfsRename(STfs *pTfs, const char *orname, const char *nrname) {
static int32_t tfsRenameAt(STfs *pTfs, SDiskID diskId, const char *orname, const char *nrname) {
char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0";
for (int32_t level = pTfs->nlevel - 1; level >= 0; level--) {
int32_t level = diskId.level;
int32_t id = diskId.id;
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = pTier->ndisk - 1; id >= 0; id--) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
if (taosRenameFile(oaname, naname) != 0 && errno != ENOENT) {
terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to rename %s to %s since %s", oaname, naname, terrstr());
return -1;
}
return 0;
}
int32_t tfsRename(STfs *pTfs, int32_t diskPrimary, const char *orname, const char *nrname) {
for (int32_t level = pTfs->nlevel - 1; level >= 0; level--) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = pTier->ndisk - 1; id >= 0; id--) {
if (level == 0 && id == diskPrimary) {
continue;
}
SDiskID diskId = {.level = level, .id = id};
if (tfsRenameAt(pTfs, diskId, orname, nrname)) {
return -1;
}
}
}
return 0;
SDiskID diskId = {.level = 0, .id = diskPrimary};
return tfsRenameAt(pTfs, diskId, orname, nrname);
}
int32_t tfsSearch(STfs *pTfs, int32_t level, const char *fname) {
if (level < 0 || level >= pTfs->nlevel) {
return -1;
}
char path[TMPNAME_LEN] = {0};
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(path, TMPNAME_LEN - 1, "%s%s%s", pDisk->path, TD_DIRSEP, fname);
if (taosCheckExistFile(path)) {
return id;
}
}
return -1;
}
STfsDir *tfsOpendir(STfs *pTfs, const char *rname) {

View File

@ -156,7 +156,7 @@ TEST_F(TfsTest, 03_Dir) {
EXPECT_NE(taosDirExist(ap4), 1);
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0);
EXPECT_EQ(taosDirExist(ap4), 1);
EXPECT_EQ(tfsRename(pTfs, p44, p45), 0);
EXPECT_EQ(tfsRename(pTfs, 0, p44, p45), 0);
EXPECT_EQ(tfsRmdir(pTfs, p4), 0);
EXPECT_NE(taosDirExist(ap4), 1);
@ -609,7 +609,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
EXPECT_NE(taosDirExist(_ap22), 1);
EXPECT_EQ(tfsMkdirRecurAt(pTfs, p4, did), 0);
EXPECT_EQ(taosDirExist(_ap22), 1);
EXPECT_EQ(tfsRename(pTfs, p44, p45), 0);
EXPECT_EQ(tfsRename(pTfs, 0, p44, p45), 0);
EXPECT_EQ(tfsRmdir(pTfs, p4), 0);
EXPECT_NE(taosDirExist(_ap22), 1);
}

View File

@ -380,43 +380,43 @@ static int32_t cfgAddItem(SConfig *pCfg, SConfigItem *pItem, const char *name) {
return 0;
}
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, bool tsc) {
SConfigItem item = {.dtype = CFG_DTYPE_BOOL, .bval = defaultVal, .tsc = tsc};
int32_t cfgAddBool(SConfig *pCfg, const char *name, bool defaultVal, int8_t scope) {
SConfigItem item = {.dtype = CFG_DTYPE_BOOL, .bval = defaultVal, .scope = scope};
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, bool tsc) {
int32_t cfgAddInt32(SConfig *pCfg, const char *name, int32_t defaultVal, int64_t minval, int64_t maxval, int8_t scope) {
if (defaultVal < minval || defaultVal > maxval) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
SConfigItem item = {.dtype = CFG_DTYPE_INT32, .i32 = defaultVal, .imin = minval, .imax = maxval, .tsc = tsc};
SConfigItem item = {.dtype = CFG_DTYPE_INT32, .i32 = defaultVal, .imin = minval, .imax = maxval, .scope = scope};
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, bool tsc) {
int32_t cfgAddInt64(SConfig *pCfg, const char *name, int64_t defaultVal, int64_t minval, int64_t maxval, int8_t scope) {
if (defaultVal < minval || defaultVal > maxval) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
SConfigItem item = {.dtype = CFG_DTYPE_INT64, .i64 = defaultVal, .imin = minval, .imax = maxval, .tsc = tsc};
SConfigItem item = {.dtype = CFG_DTYPE_INT64, .i64 = defaultVal, .imin = minval, .imax = maxval, .scope = scope};
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, double minval, double maxval, bool tsc) {
int32_t cfgAddFloat(SConfig *pCfg, const char *name, float defaultVal, double minval, double maxval, int8_t scope) {
if (defaultVal < minval || defaultVal > maxval) {
terrno = TSDB_CODE_OUT_OF_RANGE;
return -1;
}
SConfigItem item = {.dtype = CFG_DTYPE_FLOAT, .fval = defaultVal, .fmin = minval, .fmax = maxval, .tsc = tsc};
SConfigItem item = {.dtype = CFG_DTYPE_FLOAT, .fval = defaultVal, .fmin = minval, .fmax = maxval, .scope = scope};
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, bool tsc) {
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .tsc = tsc};
int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
SConfigItem item = {.dtype = CFG_DTYPE_STRING, .scope = scope};
item.str = taosStrdup(defaultVal);
if (item.str == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -425,8 +425,8 @@ int32_t cfgAddString(SConfig *pCfg, const char *name, const char *defaultVal, bo
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, bool tsc) {
SConfigItem item = {.dtype = CFG_DTYPE_DIR, .tsc = tsc};
int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
SConfigItem item = {.dtype = CFG_DTYPE_DIR, .scope = scope};
if (cfgCheckAndSetDir(&item, defaultVal) != 0) {
return -1;
}
@ -434,8 +434,8 @@ int32_t cfgAddDir(SConfig *pCfg, const char *name, const char *defaultVal, bool
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal) {
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .tsc = 1};
int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
SConfigItem item = {.dtype = CFG_DTYPE_LOCALE, .scope = scope};
if (cfgCheckAndSetLocale(&item, defaultVal) != 0) {
return -1;
}
@ -443,8 +443,8 @@ int32_t cfgAddLocale(SConfig *pCfg, const char *name, const char *defaultVal) {
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal) {
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .tsc = 1};
int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
SConfigItem item = {.dtype = CFG_DTYPE_CHARSET, .scope = scope};
if (cfgCheckAndSetCharset(&item, defaultVal) != 0) {
return -1;
}
@ -452,8 +452,8 @@ int32_t cfgAddCharset(SConfig *pCfg, const char *name, const char *defaultVal) {
return cfgAddItem(pCfg, &item, name);
}
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal) {
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .tsc = 1};
int32_t cfgAddTimezone(SConfig *pCfg, const char *name, const char *defaultVal, int8_t scope) {
SConfigItem item = {.dtype = CFG_DTYPE_TIMEZONE, .scope = scope};
if (cfgCheckAndSetTimezone(&item, defaultVal) != 0) {
return -1;
}
@ -543,6 +543,27 @@ void cfgDumpItemValue(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *p
*pLen = len;
}
void cfgDumpItemScope(SConfigItem *pItem, char *buf, int32_t bufSize, int32_t *pLen) {
int32_t len = 0;
switch (pItem->scope) {
case CFG_SCOPE_SERVER:
len = snprintf(buf, bufSize, "server");
break;
case CFG_SCOPE_CLIENT:
len = snprintf(buf, bufSize, "client");
break;
case CFG_SCOPE_BOTH:
len = snprintf(buf, bufSize, "both");
break;
}
if (len > bufSize) {
len = bufSize;
}
*pLen = len;
}
void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
if (dump) {
printf(" global config");
@ -560,7 +581,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
int32_t size = taosArrayGetSize(pCfg->array);
for (int32_t i = 0; i < size; ++i) {
SConfigItem *pItem = taosArrayGet(pCfg->array, i);
if (tsc && !pItem->tsc) continue;
if (tsc && pItem->scope == CFG_SCOPE_SERVER) continue;
if (dump && strcmp(pItem->name, "scriptDir") == 0) continue;
if (dump && strcmp(pItem->name, "simDebugFlag") == 0) continue;
tstrncpy(src, cfgStypeStr(pItem->stype), CFG_SRC_PRINT_LEN);

View File

@ -329,6 +329,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE, "Please use this comma
// vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_VGROUP_ID, "Vnode is closed or removed")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INIT_FAILED, "Vnode init failure")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_WRITE_AUTH, "Database write operation denied")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_NOT_EXIST, "Vnode not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ALREADY_EXIST, "Vnode already exist")

View File

@ -160,6 +160,7 @@
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
,,n,system-test,python3 ./test.py -f 0-others/splitVGroup.py -N 5
,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_replica.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py

View File

@ -440,8 +440,10 @@ class TDSql:
time.sleep(1)
continue
def execute(self, sql,queryTimes=30):
def execute(self, sql, queryTimes=30, show=False):
self.sql = sql
if show:
tdLog.info(sql)
i=1
while i <= queryTimes:
try:

View File

@ -0,0 +1,309 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import random
import time
import copy
import string
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
# random string
def random_string(self, count):
letters = string.ascii_letters
return ''.join(random.choice(letters) for i in range(count))
# get col value and total max min ...
def getColsValue(self, i, j):
# c1 value
if random.randint(1, 10) == 5:
c1 = None
else:
c1 = 1
# c2 value
if j % 3200 == 0:
c2 = 8764231
elif random.randint(1, 10) == 5:
c2 = None
else:
c2 = random.randint(-87654297, 98765321)
value = f"({self.ts}, "
# c1
if c1 is None:
value += "null,"
else:
self.c1Cnt += 1
value += f"{c1},"
# c2
if c2 is None:
value += "null,"
else:
value += f"{c2},"
# total count
self.c2Cnt += 1
# max
if self.c2Max is None:
self.c2Max = c2
else:
if c2 > self.c2Max:
self.c2Max = c2
# min
if self.c2Min is None:
self.c2Min = c2
else:
if c2 < self.c2Min:
self.c2Min = c2
# sum
if self.c2Sum is None:
self.c2Sum = c2
else:
self.c2Sum += c2
# c3 same with ts
value += f"{self.ts})"
# move next 1s interval
self.ts += 1
return value
# insert data
def insertData(self):
tdLog.info("insert data ....")
sqls = ""
for i in range(self.childCnt):
# insert child table
values = ""
pre_insert = f"insert into @db_name.t{i} values "
for j in range(self.childRow):
if values == "":
values = self.getColsValue(i, j)
else:
values += "," + self.getColsValue(i, j)
# batch insert
if j % self.batchSize == 0 and values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# append last
if values != "":
sql = pre_insert + values
self.exeDouble(sql)
values = ""
# insert finished
tdLog.info(f"insert data successfully.\n"
f" inserted child table = {self.childCnt}\n"
f" inserted child rows = {self.childRow}\n"
f" total inserted rows = {self.childCnt*self.childRow}\n")
return
def exeDouble(self, sql):
# dbname replace
sql1 = sql.replace("@db_name", self.db1)
if len(sql1) > 100:
tdLog.info(sql1[:100])
else:
tdLog.info(sql1)
tdSql.execute(sql1)
sql2 = sql.replace("@db_name", self.db2)
if len(sql2) > 100:
tdLog.info(sql2[:100])
else:
tdLog.info(sql2)
tdSql.execute(sql2)
# prepareEnv
def prepareEnv(self):
# init
self.ts = 1680000000000
self.childCnt = 2
self.childRow = 100000
self.batchSize = 5000
self.vgroups1 = 4
self.vgroups2 = 4
self.db1 = "db1" # no sma
self.db2 = "db2" # have sma
self.smaClause = "interval(10s)"
# total
self.c1Cnt = 0
self.c2Cnt = 0
self.c2Max = None
self.c2Min = None
self.c2Sum = None
# alter local optimization to treu
sql = "alter local 'querysmaoptimize 1'"
tdSql.execute(sql, 5, True)
# check forbid mulit-replic on create sma index
sql = f"create database db vgroups {self.vgroups1} replica 3"
tdSql.execute(sql, 5, True)
sql = f"create table db.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
tdSql.execute(sql, 5, True)
sql = f"create sma index sma_test on db.st function(max(c1),max(c2),min(c1),min(c2)) {self.smaClause};"
tdLog.info(sql)
tdSql.error(sql)
# create database db
sql = f"create database @db_name vgroups {self.vgroups1} replica 1"
self.exeDouble(sql)
# create super talbe st
sql = f"create table @db_name.st(ts timestamp, c1 int, c2 bigint, ts1 timestamp) tags(area int)"
self.exeDouble(sql)
# create child table
for i in range(self.childCnt):
sql = f"create table @db_name.t{i} using @db_name.st tags({i}) "
self.exeDouble(sql)
# create sma index on db2
sql = f"use {self.db2}"
tdSql.execute(sql)
sql = f"create sma index sma_index_maxmin on {self.db2}.st function(max(c1),max(c2),min(c1),min(c2)) {self.smaClause};"
tdLog.info(sql)
tdSql.execute(sql)
# insert data
self.insertData()
# check data correct
def checkExpect(self, sql, expectVal):
tdSql.query(sql)
rowCnt = tdSql.getRows()
for i in range(rowCnt):
val = tdSql.getData(i,0)
if val != expectVal:
tdLog.exit(f"Not expect . query={val} expect={expectVal} i={i} sql={sql}")
return False
tdLog.info(f"check expect ok. sql={sql} expect ={expectVal} rowCnt={rowCnt}")
return True
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
# check query result same
def queryDoubleImpl(self, sql):
# sql
sql1 = sql.replace('@db_name', self.db1)
tdLog.info(sql1)
start1 = time.time()
rows1 = tdSql.query(sql1)
spend1 = time.time() - start1
res1 = copy.copy(tdSql.queryResult)
sql2 = sql.replace('@db_name', self.db2)
tdLog.info(sql2)
start2 = time.time()
tdSql.query(sql2)
spend2 = time.time() - start2
res2 = tdSql.queryResult
rowlen1 = len(res1)
rowlen2 = len(res2)
if rowlen1 != rowlen2:
tdLog.info(f"check error. rowlen1={rowlen1} rowlen2={rowlen2} both not equal.")
return False
for i in range(rowlen1):
row1 = res1[i]
row2 = res2[i]
collen1 = len(row1)
collen2 = len(row2)
if collen1 != collen2:
tdLog.info(f"checkerror. collen1={collen1} collen2={collen2} both not equal.")
return False
for j in range(collen1):
if row1[j] != row2[j]:
tdLog.exit(f"col={j} col1={row1[j]} col2={row2[j]} both col not equal.")
return False
# warning performance
multiple = spend1/spend2
tdLog.info("spend1=%.6fs spend2=%.6fs multiple=%.1f"%(spend1, spend2, multiple))
if spend2 > spend1 and multiple < 4:
tdLog.info(f"performace not reached: multiple(spend1/spend)={multiple} require is >=4 ")
return False
return True
# check query result same
def queryDouble(self, sql, tryCount=60, gap=1):
for i in range(tryCount):
if self.queryDoubleImpl(sql):
return True
# error
tdLog.info(f"queryDouble return false, try loop={i}")
time.sleep(gap)
tdLog.exit(f"queryDouble try {tryCount} times, but all failed.")
return False
# check result
def checkResult(self):
# max
sql = f"select max(c1) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# min
sql = f"select max(c2) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# mix
sql = f"select max(c1),max(c2),min(c1),min(c2) from @db_name.st {self.smaClause}"
self.queryDouble(sql)
# run
def run(self):
# prepare env
self.prepareEnv()
# check two db query result same
tdLog.info(f"check have sma(db1) and no sma(db2) performace...")
self.checkResult()
# stop
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())