From 387e3a535f3e0433d6e8049dd064b5f151585179 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Aug 2023 15:20:50 +0800 Subject: [PATCH 1/5] docs:add varbinary type --- docs/en/12-taos-sql/01-data-type.md | 4 +- docs/en/12-taos-sql/10-function.md | 2 +- .../13-schemaless/13-schemaless.md | 8 +- docs/zh/12-taos-sql/01-data-type.md | 6 +- docs/zh/12-taos-sql/10-function.md | 2 +- .../13-schemaless/13-schemaless.md | 202 +++++++++++++++++- 6 files changed, 214 insertions(+), 10 deletions(-) diff --git a/docs/en/12-taos-sql/01-data-type.md b/docs/en/12-taos-sql/01-data-type.md index f81aaceca3..020eb27cfe 100644 --- a/docs/en/12-taos-sql/01-data-type.md +++ b/docs/en/12-taos-sql/01-data-type.md @@ -43,6 +43,8 @@ In TDengine, the data types below can be used when specifying a column or tag. | 15 | JSON | | JSON type can only be used on tags. A tag of json type is excluded with any other tags of any other type. | | 16 | VARCHAR | User-defined | Alias of BINARY | | 17 | GEOMETRY | User-defined | Geometry | +| 18 | VARBINARY | User-defined | Binary data with variable length + :::note - Each row of the table cannot be longer than 48KB (64KB since version 3.0.5.0) (note that each BINARY/NCHAR/GEOMETRY column takes up an additional 2 bytes of storage space). @@ -57,7 +59,7 @@ In TDengine, the data types below can be used when specifying a column or tag. | 3 | POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0)) | 13+3*16 | 13+4094*16 | +16 | - Numeric values in SQL statements will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number. - +- VARBINARY is a data type that stores binary data, with a maximum length of 65517 bytes and a maximum length of 16382 bytes for tag columns. Binary data can be written through SQL or schemaless (which needs to be converted to a string starting with \x), or written through stmt (which can directly use binary). Display starting with hexadecimal starting with \x. ::: ## Constants diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 234625bfb4..ad64ae9fbe 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -402,7 +402,7 @@ CAST(expr AS type_name) **Return value type**: The type specified by parameter `type_name` -**Applicable data types**: All data types except JSON +**Applicable data types**: All data types except JSON and VARBINARY. If type_name is VARBINARY, expr can only be VARCHAR. **Nested query**: It can be used in both the outer query and inner query in a nested query. diff --git a/docs/en/14-reference/13-schemaless/13-schemaless.md b/docs/en/14-reference/13-schemaless/13-schemaless.md index 54be18eea3..d4b6606ac5 100644 --- a/docs/en/14-reference/13-schemaless/13-schemaless.md +++ b/docs/en/14-reference/13-schemaless/13-schemaless.md @@ -32,8 +32,10 @@ All data in tag_set is automatically converted to the NCHAR data type and does n In the schemaless writing data line protocol, each data item in the field_set needs to be described with its data type. Let's explain in detail: -- If there are English double quotes on both sides, it indicates the BINARY(32) type. For example, `"abc"`. -- If there are double quotes on both sides and an L prefix, it means NCHAR(32) type. For example, `L"error message"`. +- If there are English double quotes on both sides, it indicates the VARCHAR type. For example, `"abc"`. +- If there are double quotes on both sides and a L/l prefix, it means NCHAR type. For example, `L"error message"`. +- If there are double quotes on both sides and a G/g prefix, it means GEOMETRY type. For example `G"Point(4.343 89.342)"`. +- If there are double quotes on both sides and a B/b prefix, it means VARBINARY type. Hexadecimal start with \x or string can be used in double quotes. For example `B"\x98f46e"` `B"hello"`. - Spaces, equals sign (=), comma (,), double quote ("), and backslash (\\) need to be escaped with a backslash (\\) in front. (All refer to the ASCII character). The rules are as follows: | **Serial number** | **Element** | **Escape characters** | @@ -110,7 +112,7 @@ You can configure smlChildTableName in taos.cfg to specify table names, for exam Note: TDengine 3.0.3.0 and later automatically detect whether order is consistent. This parameter is no longer used. 9. Due to the fact that SQL table names do not support period (.), schemaless has also processed period (.). If there is a period (.) in the table name automatically created by schemaless, it will be automatically replaced with an underscore (\_). If you manually specify a sub table name, if there is a dot (.) in the sub table name, it will also be converted to an underscore (\_) 10. Taos.cfg adds the configuration of smlTsDefaultName (with a string value), which only works on the client side. After configuration, the time column name of the schemaless automatic table creation can be set through this configuration. If not configured, defaults to _ts. - +11. Super table name or child table name are case sensitive. :::tip All processing logic of schemaless will still follow TDengine's underlying restrictions on data structures, such as the total length of each row of data cannot exceed 48 KB(64 KB since version 3.0.5.0) and the total length of a tag value cannot exceed 16 KB. See [TDengine SQL Boundary Limits](/taos-sql/limit) for specific constraints in this area. ::: diff --git a/docs/zh/12-taos-sql/01-data-type.md b/docs/zh/12-taos-sql/01-data-type.md index 1df07e7e7f..82ddb630a7 100644 --- a/docs/zh/12-taos-sql/01-data-type.md +++ b/docs/zh/12-taos-sql/01-data-type.md @@ -42,11 +42,12 @@ CREATE DATABASE db_name PRECISION 'ns'; | 14 | NCHAR | 自定义 | 记录包含多字节字符在内的字符串,如中文字符。每个 NCHAR 字符占用 4 字节的存储空间。字符串两端使用单引号引用,字符串内的单引号需用转义字符 `\'`。NCHAR 使用时须指定字符串大小,类型为 NCHAR(10) 的列表示此列的字符串最多存储 10 个 NCHAR 字符。如果用户字符串长度超出声明长度,将会报错。 | | 15 | JSON | | JSON 数据类型, 只有 Tag 可以是 JSON 格式 | | 16 | VARCHAR | 自定义 | BINARY 类型的别名 | -| 17 | GEOMETRY | 自定义 | 几何类型 | +| 17 | GEOMETRY | 自定义 | 几何类型 +| 18 | VARBINARY | 自定义 | 可变长的二进制数据| :::note -- 表的每行长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB)(注意:每个 BINARY/NCHAR/GEOMETRY 类型的列还会额外占用 2 个字节的存储位置)。 +- 表的每行长度不能超过 48KB(从 3.0.5.0 版本开始为 64KB)(注意:每个 BINARY/NCHAR/GEOMETRY/VARBINARY 类型的列还会额外占用 2 个字节的存储位置)。 - 虽然 BINARY 类型在底层存储上支持字节型的二进制字符,但不同编程语言对二进制数据的处理方式并不保证一致,因此建议在 BINARY 类型中只存储 ASCII 可见字符,而避免存储不可见字符。多字节的数据,例如中文字符,则需要使用 NCHAR 类型进行保存。如果强行使用 BINARY 类型保存中文字符,虽然有时也能正常读写,但并不带有字符集信息,很容易出现数据乱码甚至数据损坏等情况。 - BINARY 类型理论上最长可以有 16,374(从 3.0.5.0 版本开始,数据列为 65,517,标签列为 16,382) 字节。BINARY 仅支持字符串输入,字符串两端需使用单引号引用。使用时须指定大小,如 BINARY(20) 定义了最长为 20 个单字节字符的字符串,每个字符占 1 字节的存储空间,总共固定占用 20 字节的空间,此时如果用户字符串超出 20 字节将会报错。对于字符串内的单引号,可以用转义字符反斜线加单引号来表示,即 `\'`。 - GEOMETRY 类型数据列为最大长度为 65,517 字节,标签列最大长度为 16,382 字节。支持 2D 的 POINT、LINESTRING 和 POLYGON 子类型数据。长度计算方式如下表所示: @@ -58,6 +59,7 @@ CREATE DATABASE db_name PRECISION 'ns'; | 3 | POLYGON((1.0 1.0, 2.0 2.0, 1.0 1.0)) | 13+3*16 | 13+4094*16 | +16 | - SQL 语句中的数值类型将依据是否存在小数点,或使用科学计数法表示,来判断数值类型是否为整型或者浮点型,因此在使用时要注意相应类型越界的情况。例如,9999999999999999999 会认为超过长整型的上边界而溢出,而 9999999999999999999.0 会被认为是有效的浮点数。 +- VARBINARY 是一种存储二进制数据的数据类型,最大长度为 65,517 字节,标签列最大长度为 16,382 字节。可以通过sql或schemaless方式写入二进制数据(需要转换为\x开头的字符串写入),也可以通过stmt方式写入(可以直接使用二进制)。显示时通过16进制\x开头。 ::: diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 3c0ee06caf..9e9d961f6d 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -402,7 +402,7 @@ CAST(expr AS type_name) **返回结果类型**:CAST 中指定的类型(type_name)。 -**适用数据类型**:输入参数 expression 的类型可以是除JSON外的所有类型。 +**适用数据类型**:输入参数 expr 的类型可以是除JSON和VARBINARY外的所有类型。如果 type_name 为 VARBINARY,则 expr 只能是 VARCHAR 类型。 **嵌套子查询支持**:适用于内层查询和外层查询。 diff --git a/docs/zh/14-reference/13-schemaless/13-schemaless.md b/docs/zh/14-reference/13-schemaless/13-schemaless.md index 9f5bae081c..c845305d38 100644 --- a/docs/zh/14-reference/13-schemaless/13-schemaless.md +++ b/docs/zh/14-reference/13-schemaless/13-schemaless.md @@ -33,8 +33,205 @@ tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要 在无模式写入数据行协议中,field_set 中的每个数据项都需要对自身的数据类型进行描述。具体来说: -- 如果两边有英文双引号,表示 BINARY(32) 类型。例如 `"abc"`。 -- 如果两边有英文双引号而且带有 L 前缀,表示 NCHAR(32) 类型。例如 `L"报错信息"`。 +- 如果两边有英文双引号,表示 BI--- + title: Schemaless 写入 + description: 'Schemaless 写入方式,可以免于预先创建超级表/子表的步骤,随着数据写入接口能够自动创建与数据对应的存储结构' +--- + +在物联网应用中,常会采集比较多的数据项,用于实现智能控制、业务分析、设备监控等。由于应用逻辑的版本升级,或者设备自身的硬件调整等原因,数据采集项就有可能比较频繁地出现变动。为了在这种情况下方便地完成数据记录工作,TDengine 提供调用 Schemaless 写入方式,可以免于预先创建超级表/子表的步骤,随着数据写入接口能够自动创建与数据对应的存储结构。并且在必要时,Schemaless +将自动增加必要的数据列,保证用户写入的数据可以被正确存储。 + +无模式写入方式建立的超级表及其对应的子表与通过 SQL 直接建立的超级表和子表完全没有区别,你也可以通过,SQL 语句直接向其中写入数据。需要注意的是,通过无模式写入方式建立的表,其表名是基于标签值按照固定的映射规则生成,所以无法明确地进行表意,缺乏可读性。 + +注意:无模式写入会自动建表,不需要手动建表,手动建表的话可能会出现未知的错误。 + +## 无模式写入行协议 + +TDengine 的无模式写入的行协议兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议、OpenTSDB 的 JSON 格式协议。但是使用这三种协议的时候,需要在 API 中指定输入内容使用解析协议的标准。 + +对于 InfluxDB、OpenTSDB 的标准写入协议请参考各自的文档。下面首先以 InfluxDB 的行协议为基础,介绍 TDengine 扩展的协议内容,允许用户采用更加精细的方式控制(超级表)模式。 + +Schemaless 采用一个字符串来表达一个数据行(可以向写入 API 中一次传入多行字符串来实现多个数据行的批量写入),其格式约定如下: + +```json +measurement,tag_set field_set timestamp +``` + +其中: + +- measurement 将作为数据表名。它与 tag_set 之间使用一个英文逗号来分隔。 +- tag_set 将作为标签数据,其格式形如 `=,=`,也即可以使用英文逗号来分隔多个标签数据。它与 field_set 之间使用一个半角空格来分隔。 +- field_set 将作为普通列数据,其格式形如 `=,=`,同样是使用英文逗号来分隔多个普通列的数据。它与 timestamp 之间使用一个半角空格来分隔。 +- timestamp 即本行数据对应的主键时间戳。 + +tag_set 中的所有的数据自动转化为 nchar 数据类型,并不需要使用双引号(")。 + +在无模式写入数据行协议中,field_set 中的每个数据项都需要对自身的数据类型进行描述。具体来说: + +- 如果两边有英文双引号,表示 VARCHAR 类型。例如 `"abc"`。 +- 如果两边有英文双引号而且带有 L或l 前缀,表示 NCHAR 类型。例如 `L"报错信息"`。 +- 如果两边有英文双引号而且带有 G或g 前缀,表示 GEOMETRY 类型。例如 `G"Point(4.343 89.342)"`。 +- 如果两边有英文双引号而且带有 B或b 前缀,表示 VARBINARY 类型,双引号内可以为\x开头的16进制或者字符串。例如 `B"\x98f46e"` `B"hello"`。 +- 对空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义。(都指的是英文半角符号)。具体转义规则如下: + +| **序号** | **域** | **需转义字符** | +| -------- | ----------- | ----------------------------- | +| 1 | 超级表名 | 逗号,空格 | +| 2 | 标签名 | 逗号,等号,空格 | +| 3 | 标签值 | 逗号,等号,空格 | +| 4 | 列名 | 逗号,等号,空格 | +| 5 | 列值 | 双引号,反斜杠 | + +两个连续的反斜杠,第一个作为转义符,只有一个反斜杠则无需转义. 反斜杠转义规则举例如下: + +| **序号** | **反斜杠** | **转义为** | +| -------- | ----------- | ----------------------------- | +| 1 | \ | \ | +| 2 | \\\\ | \ | +| 3 | \\\\\\ | \\\\ | +| 4 | \\\\\\\\ | \\\\ | +| 5 | \\\\\\\\\\ | \\\\\\ | +| 6 | \\\\\\\\\\\\ | \\\\\\ | + +- 数值类型将通过后缀来区分数据类型: + +| **序号** | **后缀** | **映射类型** | **大小(字节)** | +| -------- | ----------- | ----------------------------- | -------------- | +| 1 | 无或 f64 | double | 8 | +| 2 | f32 | float | 4 | +| 3 | i8/u8 | TinyInt/UTinyInt | 1 | +| 4 | i16/u16 | SmallInt/USmallInt | 2 | +| 5 | i32/u32 | Int/UInt | 4 | +| 6 | i64/i/u64/u | BigInt/BigInt/UBigInt/UBigInt | 8 | + +- t, T, true, True, TRUE, f, F, false, False 将直接作为 BOOL 型来处理。 + +例如如下数据行表示:向名为 st 的超级表下的 t1 标签为 "3"(NCHAR)、t2 标签为 "4"(NCHAR)、t3 +标签为 "t3"(NCHAR)的数据子表,写入 c1 列为 3(BIGINT)、c2 列为 false(BOOL)、c3 +列为 "passit"(BINARY)、c4 列为 4(DOUBLE)、主键时间戳为 1626006833639000000 的一行数据。 + +```json +st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000 +``` + +需要注意的是,如果描述数据类型后缀时使用了错误的大小写,或者为数据指定的数据类型有误,均可能引发报错提示而导致数据写入失败。 + +## 无模式写入的主要处理逻辑 + +无模式写入按照如下原则来处理行数据: + +1. 将使用如下规则来生成子表名:首先将 measurement 的名称和标签的 key 和 value 组合成为如下的字符串 + +```json +"measurement,tag_key1=tag_value1,tag_key2=tag_value2" +``` + +:::tip +需要注意的是,这里的 tag_key1, tag_key2 并不是用户输入的标签的原始顺序,而是使用了标签名称按照字符串升序排列后的结果。所以,tag_key1 并不是在行协议中输入的第一个标签。 +排列完成以后计算该字符串的 MD5 散列值 "md5_val"。然后将计算的结果与字符串组合生成表名:“t_md5_val”。其中的 “t_” 是固定的前缀,每个通过该映射关系自动生成的表都具有该前缀。 +:::tip +为了让用户可以指定生成的表名,可以通过在taos.cfg里配置 smlChildTableName 参数来指定。 +举例如下:配置 smlChildTableName=tname 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的表名为 cpu1,注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。 + +2. 如果解析行协议获得的超级表不存在,则会创建这个超级表(不建议手动创建超级表,不然插入数据可能异常)。 +3. 如果解析行协议获得子表不存在,则 Schemaless 会按照步骤 1 或 2 确定的子表名来创建子表。 +4. 如果数据行中指定的标签列或普通列不存在,则在超级表中增加对应的标签列或普通列(只增不减)。 +5. 如果超级表中存在一些标签列或普通列未在一个数据行中被指定取值,那么这些列的值在这一行中会被置为 + NULL。 +6. 对 BINARY 或 NCHAR 列,如果数据行中所提供值的长度超出了列类型的限制,自动增加该列允许存储的字符长度上限(只增不减),以保证数据的完整保存。 +7. 整个处理过程中遇到的错误会中断写入过程,并返回错误代码。 +8. 为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常,从3.0.3.0开始,自动检测顺序是否一致,该配置废弃。 +9. 由于sql建表表名不支持点号(.),所以schemaless也对点号(.)做了处理,如果schemaless自动建表的表名如果有点号(.),会自动替换为下划线(\_)。如果手动指定子表名的话,子表名里有点号(.),同样转化为下划线(\_)。 +10. taos.cfg 增加 smlTsDefaultName 配置(值为字符串),只在client端起作用,配置后,schemaless自动建表的时间列名字可以通过该配置设置。不配置的话,默认为 _ts +11. 无模式写入的数据超级表或子表名区分大小写 + +:::tip +无模式所有的处理逻辑,仍会遵循 TDengine 对数据结构的底层限制,例如每行数据的总长度不能超过 +48KB(从 3.0.5.0 版本开始为 64KB),标签值的总长度不超过16KB。这方面的具体限制约束请参见 [TDengine SQL 边界限制](/taos-sql/limit) + +::: + +## 时间分辨率识别 + +无模式写入过程中支持三个指定的模式,具体如下 + +| **序号** | **值** | **说明** | +| -------- | ------------------- | ------------------------------- | +| 1 | SML_LINE_PROTOCOL | InfluxDB 行协议(Line Protocol) | +| 2 | SML_TELNET_PROTOCOL | OpenTSDB 文本行协议 | +| 3 | SML_JSON_PROTOCOL | JSON 协议格式 | + +在 SML_LINE_PROTOCOL 解析模式下,需要用户指定输入的时间戳的时间分辨率。可用的时间分辨率如下表所示: + +| **序号** | **时间分辨率定义** | **含义** | +| -------- | --------------------------------- | -------------- | +| 1 | TSDB_SML_TIMESTAMP_NOT_CONFIGURED | 未定义(无效) | +| 2 | TSDB_SML_TIMESTAMP_HOURS | 小时 | +| 3 | TSDB_SML_TIMESTAMP_MINUTES | 分钟 | +| 4 | TSDB_SML_TIMESTAMP_SECONDS | 秒 | +| 5 | TSDB_SML_TIMESTAMP_MILLI_SECONDS | 毫秒 | +| 6 | TSDB_SML_TIMESTAMP_MICRO_SECONDS | 微秒 | +| 7 | TSDB_SML_TIMESTAMP_NANO_SECONDS | 纳秒 | + +在 SML_TELNET_PROTOCOL 和 SML_JSON_PROTOCOL 模式下,根据时间戳的长度来确定时间精度(与 OpenTSDB 标准操作方式相同),此时会忽略用户指定的时间分辨率。 + +## 数据模式映射规则 + +本节将说明 InfluxDB 行协议(Line Protocol)的数据如何映射成为具有模式的数据。每个行协议中数据 measurement 映射为 +超级表名称。tag_set 中的 标签名称为 数据模式中的标签名,field_set 中的名称为列名称。以如下数据为例,说明映射规则: + +```json +st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000 +``` + +该行数据映射生成一个超级表: st, 其包含了 3 个类型为 nchar 的标签,分别是:t1, t2, t3。五个数据列,分别是 ts(timestamp),c1 (bigint),c3(binary),c2 (bool), c4 (bigint)。映射成为如下 SQL 语句: + +```json +create stable st (_ts timestamp, c1 bigint, c2 bool, c3 binary(6), c4 bigint) tags(t1 nchar(1), t2 nchar(1), t3 nchar(2)) +``` + +## 数据模式变更处理 + +本节将说明不同行数据写入情况下,对于数据模式的影响。 + +在使用行协议写入一个明确的标识的字段类型的时候,后续更改该字段的类型定义,会出现明确的数据模式错误,即会触发写入 API 报告错误。如下所示, + +```json +st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4 1626006833639000000 +st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4i 1626006833640000000 +``` + +第一行的数据类型映射将 c4 列定义为 Double, 但是第二行的数据又通过数值后缀方式声明该列为 BigInt, 由此会触发无模式写入的解析错误。 + +如果列前面的行协议将数据列声明为了 binary, 后续的要求长度更长的 binary 长度,此时会触发超级表模式的变更。 + +```json +st,t1=3,t2=4,t3=t3 c1=3i64,c5="pass" 1626006833639000000 +st,t1=3,t2=4,t3=t3 c1=3i64,c5="passit" 1626006833640000000 +``` + +第一行中行协议解析会声明 c5 列是一个 binary(4)的字段,第二次行数据写入会提取列 c5 仍然是 binary 列,但是其宽度为 6,此时需要将 binary 的宽度增加到能够容纳 新字符串的宽度。 + +```json +st,t1=3,t2=4,t3=t3 c1=3i64 1626006833639000000 +st,t1=3,t2=4,t3=t3 c1=3i64,c6="passit" 1626006833640000000 +``` + +第二行数据相对于第一行来说增加了一个列 c6,类型为 binary(6)。那么此时会自动增加一个列 c6, 类型为 binary(6)。 + +## 写入完整性 + +TDengine 提供数据写入的幂等性保证,即您可以反复调用 API 进行出错数据的写入操作。但是不提供多行数据写入的原子性保证。即在多行数据一批次写入过程中,会出现部分数据写入成功,部分数据写入失败的情况。 + +## 错误码 + +如果是无模式写入过程中的数据本身错误,应用会得到 TSDB_CODE_TSC_LINE_SYNTAX_ERROR +错误信息,该错误信息表明错误发生在写入文本中。其他的错误码与原系统一致,可以通过 +taos_errstr 获取具体的错误原因。 +NARY(32) 类型。例如 `"abc"`。 +- 如果两边有英文双引号而且带有 L或l 前缀,表示 NCHAR 类型。例如 `L"报错信息"`。 +- 如果两边有英文双引号而且带有 G或g 前缀,表示 Geometry 类型。例如 `G"Point(4.343 89.342)"`。 +- 如果两边有英文双引号而且带有 B或b 前缀,表示 VARBINARY 类型 - 对空格、等号(=)、逗号(,)、双引号(")、反斜杠(\),前面需要使用反斜杠(\)进行转义。(都指的是英文半角符号)。具体转义规则如下: | **序号** | **域** | **需转义字符** | @@ -106,6 +303,7 @@ st,t1=3,t2=4,t3=t3 c1=3i64,c3="passit",c2=false,c4=4f64 1626006833639000000 8. 为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常,从3.0.3.0开始,自动检测顺序是否一致,该配置废弃。 9. 由于sql建表表名不支持点号(.),所以schemaless也对点号(.)做了处理,如果schemaless自动建表的表名如果有点号(.),会自动替换为下划线(\_)。如果手动指定子表名的话,子表名里有点号(.),同样转化为下划线(\_)。 10. taos.cfg 增加 smlTsDefaultName 配置(值为字符串),只在client端起作用,配置后,schemaless自动建表的时间列名字可以通过该配置设置。不配置的话,默认为 _ts +11. 无模式写入的数据超级表或子表名区分大小写 :::tip 无模式所有的处理逻辑,仍会遵循 TDengine 对数据结构的底层限制,例如每行数据的总长度不能超过 From be893c1bb03db059058b24f0850e11dff4798076 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Aug 2023 18:21:37 +0800 Subject: [PATCH 2/5] fix:modify error tips --- source/libs/parser/src/parUtil.c | 2 +- source/util/src/terror.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index b3ec6a6ef6..d4ddbe17ef 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY: return "Primary timestamp column cannot be dropped"; case TSDB_CODE_PAR_INVALID_MODIFY_COL: - return "Only binary/nchar/geometry column length could be modified, and the length can only be increased, not decreased"; + return "Only varbinary/binary/nchar/geometry column length could be modified, and the length can only be increased, not decreased"; case TSDB_CODE_PAR_INVALID_TBNAME: return "Invalid tbname pseudo column"; case TSDB_CODE_PAR_INVALID_FUNCTION_NAME: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 998993bf65..e271592a01 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -545,7 +545,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PASSWD, "Invalid password") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid alter table statement") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar/geometry column length could be modified, and the length can only be increased, not decreased") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only varbinary/binary/nchar/geometry column length could be modified, and the length can only be increased, not decreased") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TBNAME, "Invalid tbname pseudo column") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FUNCTION_NAME, "Invalid function name") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COMMENT_TOO_LONG, "Comment too long") From 22f0778ccab24779565bf972608ec578a7f871ef Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 31 Aug 2023 09:26:44 +0800 Subject: [PATCH 3/5] typo and remove ip port --- source/dnode/mnode/impl/src/mndDnode.c | 10 +++++----- source/dnode/mnode/impl/src/mndTopic.c | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 949d41ef07..115c33cff1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1062,16 +1062,16 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - char obj1[150] = {0}; - sprintf(obj1, "%s:%d", dropReq.fqdn, dropReq.port); + char obj1[30] = {0}; + sprintf(obj1, "%d", dropReq.dnodeId); - char obj2[30] = {0}; - sprintf(obj2, "%d", dropReq.dnodeId); + //char obj2[150] = {0}; + //sprintf(obj2, "%s:%d", dropReq.fqdn, dropReq.port); char detail[100] = {0}; sprintf(detail, "force:%d, unsafe:%d", dropReq.force, dropReq.unsafe); - auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, obj2, detail); + auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, "", detail); _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 1e4299a59e..fc52e4657e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -652,7 +652,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB); //reuse this function for topic - auditRecord(pReq, pMnode->clusterId, "crateTopic", topicName.dbname, dbname.dbname, detail); + auditRecord(pReq, pMnode->clusterId, "createTopic", topicName.dbname, dbname.dbname, detail); _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { From 05ca71d5dead0dbd8bddb5a542dfa4c74fe48a8d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 09:28:43 +0800 Subject: [PATCH 4/5] stream change ver --- include/libs/stream/tstream.h | 10 ++-- source/dnode/mnode/impl/inc/mndDef.h | 62 +++++++++++++------------ source/dnode/mnode/impl/src/mndDef.c | 4 ++ source/dnode/mnode/impl/src/mndStream.c | 29 ++++++------ source/libs/stream/src/streamTask.c | 4 +- 5 files changed, 61 insertions(+), 48 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1b3960bdba..ab9f606fe3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,7 +31,7 @@ extern "C" { typedef struct SStreamTask SStreamTask; -#define SSTREAM_TASK_VER 1 +#define SSTREAM_TASK_VER 2 enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, @@ -321,7 +321,7 @@ typedef struct { struct SStreamTask { int64_t ver; - SStreamTaskId id; + SStreamTaskId id; SSTaskBasicInfo info; STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; @@ -329,8 +329,8 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SHistDataRange dataRange; - SStreamTaskId historyTaskId; - SStreamTaskId streamTaskId; + SStreamTaskId historyTaskId; + SStreamTaskId streamTaskId; int32_t nextCheckId; SArray* checkpointInfo; // SArray STaskTimestamp tsInfo; @@ -371,6 +371,8 @@ struct SStreamTask { int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; + + char reserve[256]; }; typedef struct SMetaHbInfo { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c4c0ea238d..1bf13c8fb5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -615,25 +615,25 @@ void tDeleteSubscribeObj(SMqSubscribeObj* pSub); int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub); void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver); -//typedef struct { -// int32_t epoch; -// SArray* consumers; // SArray -//} SMqSubActionLogEntry; +// typedef struct { +// int32_t epoch; +// SArray* consumers; // SArray +// } SMqSubActionLogEntry; -//SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); -//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); -//int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry); -//void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry); +// SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); +// void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry); +// int32_t tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry); +// void* tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry); // -//typedef struct { -// char key[TSDB_SUBSCRIBE_KEY_LEN]; -// SArray* logs; // SArray -//} SMqSubActionLogObj; +// typedef struct { +// char key[TSDB_SUBSCRIBE_KEY_LEN]; +// SArray* logs; // SArray +// } SMqSubActionLogObj; // -//SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog); -//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); -//int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); -//void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); +// SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog); +// void tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog); +// int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog); +// void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); typedef struct { int32_t oldConsumerNum; @@ -647,12 +647,12 @@ typedef struct { } SMqRebOutputVg; typedef struct { - SArray* rebVgs; // SArray - SArray* newConsumers; // SArray - SArray* removedConsumers; // SArray - SArray* modifyConsumers; // SArray - SMqSubscribeObj* pSub; -// SMqSubActionLogEntry* pLogEntry; + SArray* rebVgs; // SArray + SArray* newConsumers; // SArray + SArray* removedConsumers; // SArray + SArray* modifyConsumers; // SArray + SMqSubscribeObj* pSub; + // SMqSubActionLogEntry* pLogEntry; } SMqRebOutputObj; typedef struct SStreamConf { @@ -674,8 +674,8 @@ typedef struct { int32_t totalLevel; int64_t smaId; // 0 for unused // info - int64_t uid; - int8_t status; + int64_t uid; + int8_t status; SStreamConf conf; // source and target int64_t sourceDbUid; @@ -690,13 +690,13 @@ typedef struct { int32_t fixedSinkVgId; // 0 for shuffle // transformation - char* sql; - char* ast; - char* physicalPlan; - SArray* tasks; // SArray> + char* sql; + char* ast; + char* physicalPlan; + SArray* tasks; // SArray> - SArray* pHTasksList; // generate the results for already stored ts data - int64_t hTaskUid; // stream task for history ts data + SArray* pHTasksList; // generate the results for already stored ts data + int64_t hTaskUid; // stream task for history ts data SSchemaWrapper outputSchema; SSchemaWrapper tagSchema; @@ -709,6 +709,8 @@ typedef struct { // 3.0.5. int64_t checkpointId; + char reserve[256]; + } SStreamObj; int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6bf4015852..a5c768a018 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -84,6 +84,8 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { // 3.0.50 ver = 3 if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve)) < 0) return -1; + tEndEncode(pEncoder); return pEncoder->pos; } @@ -157,6 +159,8 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { if (sver >= 3) { if (tDecodeI64(pDecoder, &pObj->checkpointId) < 0) return -1; } + if (tDecodeCStrTo(pDecoder, pObj->reserve) < 0) return -1; + tEndDecode(pDecoder); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f4d52556e..46eb0d9957 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -14,6 +14,7 @@ */ #include "mndStream.h" +#include "audit.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -28,9 +29,8 @@ #include "parser.h" #include "tmisce.h" #include "tname.h" -#include "audit.h" -#define MND_STREAM_VER_NUMBER 3 +#define MND_STREAM_VER_NUMBER 4 #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_MAX_NUM 60 #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" @@ -874,15 +874,18 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { code = TSDB_CODE_ACTION_IN_PROGRESS; char detail[2000] = {0}; - sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 ", " + sprintf(detail, + "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 + ", " "fillHistory:%d, igExists:%d, " - "igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", " - "maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, " + "igExpired:%d, igUpdate:%d, lastTs:%" PRId64 + ", " + "maxDelay:%" PRId64 + ", numOfTags:%d, sourceDB:%s, " "targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, - createStreamReq.fillHistory, createStreamReq.igExists, - createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs, - createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, + createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, + createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark); auditRecord(pReq, pMnode->clusterId, "createStream", createStreamReq.name, "", detail); @@ -2301,12 +2304,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { doExtractTasksFromStream(pMnode); } - for(int32_t i = 0; i < req.numOfTasks; ++i) { - STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + for (int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t index = *(int32_t *)taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); - STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + STaskStatusEntry *pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); pStatusEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%x not in ready status:%s", p->taskId, streamGetTaskStatusStr(p->status)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 01dcb435c0..dc8c509f1e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -134,6 +134,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve)) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; @@ -245,6 +246,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; tEndDecode(pDecoder); return 0; @@ -483,7 +485,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { pTask->status.taskStatus = TASK_STATUS__STOP; qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS); - while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */!streamTaskIsIdle(pTask)) { + while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) { qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel); taosMsleep(100); } From 83a5e2be4c435f3fe178c95bbc3f836cce48a227 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 11:42:12 +0800 Subject: [PATCH 5/5] stream change ver --- include/libs/stream/tstream.h | 3 +- source/dnode/mnode/impl/src/mndDef.c | 2 +- source/libs/stream/src/streamTask.c | 72 ++++++++++++++-------------- 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ab9f606fe3..477d35e3e2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -371,8 +371,7 @@ struct SStreamTask { int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; - - char reserve[256]; + char reserve[256]; }; typedef struct SMetaHbInfo { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a5c768a018..d01daee5a7 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -84,7 +84,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { // 3.0.50 ver = 3 if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve)) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc8c509f1e..6eb09b95ec 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -134,47 +134,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve)) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { - int64_t ver; - int64_t skip64; - int8_t skip8; - int32_t skip32; - int16_t skip16; - SEpSet epSet; - - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &ver) < 0) return -1; - - if (ver != SSTREAM_TASK_VER) return -1; - - if (tDecodeI64(pDecoder, &skip64) < 0) return -1; - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - if (tDecodeI16(pDecoder, &skip16) < 0) return -1; - - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; - - if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1; - - tEndDecode(pDecoder); - return 0; -} - int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; @@ -252,6 +217,41 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { return 0; } +int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { + int64_t ver; + int64_t skip64; + int8_t skip8; + int32_t skip32; + int16_t skip16; + SEpSet epSet; + + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &ver) < 0) return -1; + + if (ver != SSTREAM_TASK_VER) return -1; + + if (tDecodeI64(pDecoder, &skip64) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI16(pDecoder, &skip16) < 0) return -1; + + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; + + if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + static void freeItem(void* p) { SStreamContinueExecInfo* pInfo = p; rpcFreeCont(pInfo->msg.pCont);