Merge branch 'main' into feat/TD-26127-audit-sql

This commit is contained in:
dm chen 2023-09-19 14:39:20 +08:00 committed by GitHub
commit ee913cbfbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 511 additions and 216 deletions

View File

@ -32,10 +32,10 @@ Provides information about dnodes. Similar to SHOW DNODES. Users whose SYSINFO a
| --- | :------------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- | | --- | :------------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1 | vnodes | SMALLINT | Current number of vnodes on the dnode. It should be noted that `vnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 1 | vnodes | SMALLINT | Current number of vnodes on the dnode. It should be noted that `vnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 2 | support_vnodes | SMALLINT | Maximum number of vnodes on the dnode | | 2 | support_vnodes | SMALLINT | Maximum number of vnodes on the dnode |
| 3 | status | BINARY(10) | Current status | | 3 | status | VARCHAR(10) | Current status |
| 4 | note | BINARY(256) | Reason for going offline or other information | | 4 | note | VARCHAR(256) | Reason for going offline or other information |
| 5 | id | SMALLINT | Dnode ID | | 5 | id | SMALLINT | Dnode ID |
| 6 | endpoint | BINARY(134) | Dnode endpoint | | 6 | endpoint | VARCHAR(134) | Dnode endpoint |
| 7 | create | TIMESTAMP | Creation time | | 7 | create | TIMESTAMP | Creation time |
## INS_MNODES ## INS_MNODES
@ -45,8 +45,8 @@ Provides information about mnodes. Similar to SHOW MNODES. Users whose SYSINFO a
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | ------------------------------------------ | | --- | :---------: | ------------- | ------------------------------------------ |
| 1 | id | SMALLINT | Mnode ID | | 1 | id | SMALLINT | Mnode ID |
| 2 | endpoint | BINARY(134) | Mnode endpoint | | 2 | endpoint | VARCHAR(134) | Mnode endpoint |
| 3 | role | BINARY(10) | Current role | | 3 | role | VARCHAR(10) | Current role |
| 4 | role_time | TIMESTAMP | Time at which the current role was assumed | | 4 | role_time | TIMESTAMP | Time at which the current role was assumed |
| 5 | create_time | TIMESTAMP | Creation time | | 5 | create_time | TIMESTAMP | Creation time |
@ -57,7 +57,17 @@ Provides information about qnodes. Similar to SHOW QNODES. Users whose SYSINFO a
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | --------------- | | --- | :---------: | ------------- | --------------- |
| 1 | id | SMALLINT | Qnode ID | | 1 | id | SMALLINT | Qnode ID |
| 2 | endpoint | BINARY(134) | Qnode endpoint | | 2 | endpoint | VARCHAR(134) | Qnode endpoint |
| 3 | create_time | TIMESTAMP | Creation time |
## INS_SNODES
Provides information about snodes. Similar to SHOW SNODES. Users whose SYSINFO attribute is 0 can't view this table.
| # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | --------------- |
| 1 | id | SMALLINT | Snode ID |
| 2 | endpoint | VARCHAR(134) | Snode endpoint |
| 3 | create_time | TIMESTAMP | Creation time | | 3 | create_time | TIMESTAMP | Creation time |
## INS_CLUSTER ## INS_CLUSTER
@ -67,7 +77,7 @@ Provides information about the cluster. Users whose SYSINFO attribute is 0 can't
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | --------------- | | --- | :---------: | ------------- | --------------- |
| 1 | id | BIGINT | Cluster ID | | 1 | id | BIGINT | Cluster ID |
| 2 | name | BINARY(134) | Cluster name | | 2 | name | VARCHAR(134) | Cluster name |
| 3 | create_time | TIMESTAMP | Creation time | | 3 | create_time | TIMESTAMP | Creation time |
## INS_DATABASES ## INS_DATABASES
@ -111,15 +121,15 @@ Provides information about user-defined functions.
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :-----------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- | | --- | :-----------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1 | name | BINARY(64) | Function name | | 1 | name | VARCHAR(64) | Function name |
| 2 | comment | BINARY(255) | Function description. It should be noted that `comment` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 2 | comment | VARCHAR(255) | Function description. It should be noted that `comment` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 3 | aggregate | INT | Whether the UDF is an aggregate function. It should be noted that `aggregate` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 3 | aggregate | INT | Whether the UDF is an aggregate function. It should be noted that `aggregate` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 4 | output_type | BINARY(31) | Output data type | | 4 | output_type | VARCHAR(31) | Output data type |
| 5 | create_time | TIMESTAMP | Creation time | | 5 | create_time | TIMESTAMP | Creation time |
| 6 | code_len | INT | Length of the source code | | 6 | code_len | INT | Length of the source code |
| 7 | bufsize | INT | Buffer size | | 7 | bufsize | INT | Buffer size |
| 8 | func_language | BINARY(31) | UDF programming language | | 8 | func_language | VARCHAR(31) | UDF programming language |
| 9 | func_body | BINARY(16384) | UDF function body | | 9 | func_body | VARCHAR(16384) | UDF function body |
| 10 | func_version | INT | UDF function version. starting from 0. Increasing by 1 each time it is updated | | 10 | func_version | INT | UDF function version. starting from 0. Increasing by 1 each time it is updated |
## INS_INDEXES ## INS_INDEXES
@ -128,12 +138,12 @@ Provides information about user-created indices. Similar to SHOW INDEX.
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :--------------: | ------------- | --------------------------------------------------------------------- | | --- | :--------------: | ------------- | --------------------------------------------------------------------- |
| 1 | db_name | BINARY(32) | Database containing the table with the specified index | | 1 | db_name | VARCHAR(32) | Database containing the table with the specified index |
| 2 | table_name | BINARY(192) | Table containing the specified index | | 2 | table_name | VARCHAR(192) | Table containing the specified index |
| 3 | index_name | BINARY(192) | Index name | | 3 | index_name | VARCHAR(192) | Index name |
| 4 | db_name | BINARY(64) | Index column | | 4 | db_name | VARCHAR(64) | Index column |
| 5 | index_type | BINARY(10) | SMA or tag index | | 5 | index_type | VARCHAR(10) | SMA or tag index |
| 6 | index_extensions | BINARY(256) | Other information For SMA/tag indices, this shows a list of functions | | 6 | index_extensions | VARCHAR(256) | Other information For SMA/tag indices, this shows a list of functions |
## INS_STABLES ## INS_STABLES
@ -141,16 +151,16 @@ Provides information about supertables.
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :-----------: | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | --- | :-----------: | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1 | stable_name | BINARY(192) | Supertable name | | 1 | stable_name | VARCHAR(192) | Supertable name |
| 2 | db_name | BINARY(64) | All databases in the supertable | | 2 | db_name | VARCHAR(64) | All databases in the supertable |
| 3 | create_time | TIMESTAMP | Creation time | | 3 | create_time | TIMESTAMP | Creation time |
| 4 | columns | INT | Number of columns | | 4 | columns | INT | Number of columns |
| 5 | tags | INT | Number of tags. It should be noted that `tags` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 5 | tags | INT | Number of tags. It should be noted that `tags` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 6 | last_update | TIMESTAMP | Last updated time | | 6 | last_update | TIMESTAMP | Last updated time |
| 7 | table_comment | BINARY(1024) | Table description | | 7 | table_comment | VARCHAR(1024) | Table description |
| 8 | watermark | BINARY(64) | Window closing time. It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 8 | watermark | VARCHAR(64) | Window closing time. It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 9 | max_delay | BINARY(64) | Maximum delay for pushing stream processing results. It should be noted that `max_delay` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 9 | max_delay | VARCHAR(64) | Maximum delay for pushing stream processing results. It should be noted that `max_delay` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 10 | rollup | BINARY(128) | Rollup aggregate function. It should be noted that `rollup` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 10 | rollup | VARCHAR(128) | Rollup aggregate function. It should be noted that `rollup` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
## INS_TABLES ## INS_TABLES
@ -158,37 +168,37 @@ Provides information about standard tables and subtables.
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :-----------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------- | | --- | :-----------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------- |
| 1 | table_name | BINARY(192) | Table name | | 1 | table_name | VARCHAR(192) | Table name |
| 2 | db_name | BINARY(64) | Database name | | 2 | db_name | VARCHAR(64) | Database name |
| 3 | create_time | TIMESTAMP | Creation time | | 3 | create_time | TIMESTAMP | Creation time |
| 4 | columns | INT | Number of columns | | 4 | columns | INT | Number of columns |
| 5 | stable_name | BINARY(192) | Supertable name | | 5 | stable_name | VARCHAR(192) | Supertable name |
| 6 | uid | BIGINT | Table ID | | 6 | uid | BIGINT | Table ID |
| 7 | vgroup_id | INT | Vgroup ID | | 7 | vgroup_id | INT | Vgroup ID |
| 8 | ttl | INT | Table time-to-live. It should be noted that `ttl` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 8 | ttl | INT | Table time-to-live. It should be noted that `ttl` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 9 | table_comment | BINARY(1024) | Table description | | 9 | table_comment | VARCHAR(1024) | Table description |
| 10 | type | BINARY(20) | Table type | | 10 | type | VARCHAR(20) | Table type |
## INS_TAGS ## INS_TAGS
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | --------------- | | --- | :---------: | ------------- | --------------- |
| 1 | table_name | BINARY(192) | Table name | | 1 | table_name | VARCHAR(192) | Table name |
| 2 | db_name | BINARY(64) | Database name | | 2 | db_name | VARCHAR(64) | Database name |
| 3 | stable_name | BINARY(192) | Supertable name | | 3 | stable_name | VARCHAR(192) | Supertable name |
| 4 | tag_name | BINARY(64) | Tag name | | 4 | tag_name | VARCHAR(64) | Tag name |
| 5 | tag_type | BINARY(64) | Tag type | | 5 | tag_type | VARCHAR(64) | Tag type |
| 6 | tag_value | BINARY(16384) | Tag value | | 6 | tag_value | VARCHAR(16384) | Tag value |
## INS_COLUMNS ## INS_COLUMNS
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :-----------: | ------------- | ---------------- | | --- | :-----------: | ------------- | ---------------- |
| 1 | table_name | BINARY(192) | Table name | | 1 | table_name | VARCHAR(192) | Table name |
| 2 | db_name | BINARY(64) | Database name | | 2 | db_name | VARCHAR(64) | Database name |
| 3 | table_type | BINARY(21) | Table type | | 3 | table_type | VARCHAR(21) | Table type |
| 4 | col_name | BINARY(64) | Column name | | 4 | col_name | VARCHAR(64) | Column name |
| 5 | col_type | BINARY(32) | Column type | | 5 | col_type | VARCHAR(32) | Column type |
| 6 | col_length | INT | Column length | | 6 | col_length | INT | Column length |
| 7 | col_precision | INT | Column precision | | 7 | col_precision | INT | Column precision |
| 8 | col_scale | INT | Column scale | | 8 | col_scale | INT | Column scale |
@ -200,8 +210,8 @@ Provides information about TDengine users. Users whose SYSINFO attribute is 0 ca
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | ---------------- | | --- | :---------: | ------------- | ---------------- |
| 1 | user_name | BINARY(23) | User name | | 1 | user_name | VARCHAR(23) | User name |
| 2 | privilege | BINARY(256) | User permissions | | 2 | privilege | VARCHAR(256) | User permissions |
| 3 | create_time | TIMESTAMP | Creation time | | 3 | create_time | TIMESTAMP | Creation time |
## INS_GRANTS ## INS_GRANTS
@ -210,20 +220,20 @@ Provides information about TDengine Enterprise Edition permissions. Users whose
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- | | --- | :---------: | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1 | version | BINARY(9) | Whether the deployment is a licensed or trial version | | 1 | version | VARCHAR(9) | Whether the deployment is a licensed or trial version |
| 2 | cpu_cores | BINARY(9) | CPU cores included in license | | 2 | cpu_cores | VARCHAR(9) | CPU cores included in license |
| 3 | dnodes | BINARY(10) | Dnodes included in license. It should be noted that `dnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 3 | dnodes | VARCHAR(10) | Dnodes included in license. It should be noted that `dnodes` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 4 | streams | BINARY(10) | Streams included in license. It should be noted that `streams` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 4 | streams | VARCHAR(10) | Streams included in license. It should be noted that `streams` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 5 | users | BINARY(10) | Users included in license. It should be noted that `users` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 5 | users | VARCHAR(10) | Users included in license. It should be noted that `users` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 6 | accounts | BINARY(10) | Accounts included in license. It should be noted that `accounts` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 6 | accounts | VARCHAR(10) | Accounts included in license. It should be noted that `accounts` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 7 | storage | BINARY(21) | Storage space included in license. It should be noted that `storage` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 7 | storage | VARCHAR(21) | Storage space included in license. It should be noted that `storage` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 8 | connections | BINARY(21) | Client connections included in license. It should be noted that `connections` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 8 | connections | VARCHAR(21) | Client connections included in license. It should be noted that `connections` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 9 | databases | BINARY(11) | Databases included in license. It should be noted that `databases` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 9 | databases | VARCHAR(11) | Databases included in license. It should be noted that `databases` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 10 | speed | BINARY(9) | Write speed specified in license (data points per second) | | 10 | speed | VARCHAR(9) | Write speed specified in license (data points per second) |
| 11 | querytime | BINARY(9) | Total query time specified in license | | 11 | querytime | VARCHAR(9) | Total query time specified in license |
| 12 | timeseries | BINARY(21) | Number of metrics included in license | | 12 | timeseries | VARCHAR(21) | Number of metrics included in license |
| 13 | expired | BINARY(5) | Whether the license has expired | | 13 | expired | VARCHAR(5) | Whether the license has expired |
| 14 | expire_time | BINARY(19) | When the trial period expires | | 14 | expire_time | VARCHAR(19) | When the trial period expires |
## INS_VGROUPS ## INS_VGROUPS
@ -232,15 +242,15 @@ Provides information about vgroups. Users whose SYSINFO attribute is 0 can't vie
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------------------- | | --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------------------- |
| 1 | vgroup_id | INT | Vgroup ID | | 1 | vgroup_id | INT | Vgroup ID |
| 2 | db_name | BINARY(32) | Database name | | 2 | db_name | VARCHAR(32) | Database name |
| 3 | tables | INT | Tables in vgroup. It should be noted that `tables` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 3 | tables | INT | Tables in vgroup. It should be noted that `tables` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 4 | status | BINARY(10) | Vgroup status | | 4 | status | VARCHAR(10) | Vgroup status |
| 5 | v1_dnode | INT | Dnode ID of first vgroup member | | 5 | v1_dnode | INT | Dnode ID of first vgroup member |
| 6 | v1_status | BINARY(10) | Status of first vgroup member | | 6 | v1_status | VARCHAR(10) | Status of first vgroup member |
| 7 | v2_dnode | INT | Dnode ID of second vgroup member | | 7 | v2_dnode | INT | Dnode ID of second vgroup member |
| 8 | v2_status | BINARY(10) | Status of second vgroup member | | 8 | v2_status | VARCHAR(10) | Status of second vgroup member |
| 9 | v3_dnode | INT | Dnode ID of third vgroup member | | 9 | v3_dnode | INT | Dnode ID of third vgroup member |
| 10 | v3_status | BINARY(10) | Status of third vgroup member | | 10 | v3_status | VARCHAR(10) | Status of third vgroup member |
| 11 | nfiles | INT | Number of data and metadata files in the vgroup | | 11 | nfiles | INT | Number of data and metadata files in the vgroup |
| 12 | file_size | INT | Size of the data and metadata files in the vgroup | | 12 | file_size | INT | Size of the data and metadata files in the vgroup |
| 13 | tsma | TINYINT | Whether time-range-wise SMA is enabled. 1 means enabled; 0 means disabled. | | 13 | tsma | TINYINT | Whether time-range-wise SMA is enabled. 1 means enabled; 0 means disabled. |
@ -251,8 +261,8 @@ Provides system configuration information.
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------- | | --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------- |
| 1 | name | BINARY(32) | Parameter | | 1 | name | VARCHAR(32) | Parameter |
| 2 | value | BINARY(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 2 | value | VARCHAR(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
## INS_DNODE_VARIABLES ## INS_DNODE_VARIABLES
@ -261,40 +271,40 @@ Provides dnode configuration information. Users whose SYSINFO attribute is 0 can
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------- | | --- | :--------: | ------------- | ----------------------------------------------------------------------------------------------------------------------- |
| 1 | dnode_id | INT | Dnode ID | | 1 | dnode_id | INT | Dnode ID |
| 2 | name | BINARY(32) | Parameter | | 2 | name | VARCHAR(32) | Parameter |
| 3 | value | BINARY(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 3 | value | VARCHAR(64) | Value. It should be noted that `value` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
## INS_TOPICS ## INS_TOPICS
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :---------: | ------------- | -------------------------------------- | | --- | :---------: | ------------- | -------------------------------------- |
| 1 | topic_name | BINARY(192) | Topic name | | 1 | topic_name | VARCHAR(192) | Topic name |
| 2 | db_name | BINARY(64) | Database for the topic | | 2 | db_name | VARCHAR(64) | Database for the topic |
| 3 | create_time | TIMESTAMP | Creation time | | 3 | create_time | TIMESTAMP | Creation time |
| 4 | sql | BINARY(1024) | SQL statement used to create the topic | | 4 | sql | VARCHAR(1024) | SQL statement used to create the topic |
## INS_SUBSCRIPTIONS ## INS_SUBSCRIPTIONS
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :------------: | ------------- | --------------------------- | | --- | :------------: | ------------- | --------------------------- |
| 1 | topic_name | BINARY(204) | Subscribed topic | | 1 | topic_name | VARCHAR(204) | Subscribed topic |
| 2 | consumer_group | BINARY(193) | Subscribed consumer group | | 2 | consumer_group | VARCHAR(193) | Subscribed consumer group |
| 3 | vgroup_id | INT | Vgroup ID for the consumer | | 3 | vgroup_id | INT | Vgroup ID for the consumer |
| 4 | consumer_id | BIGINT | Consumer ID | | 4 | consumer_id | BIGINT | Consumer ID |
| 5 | offset | BINARY(64) | Consumption progress | | 5 | offset | VARCHAR(64) | Consumption progress |
| 6 | rows | BIGINT | Number of consumption items | | 6 | rows | BIGINT | Number of consumption items |
## INS_STREAMS ## INS_STREAMS
| # | **Column** | **Data Type** | **Description** | | # | **Column** | **Data Type** | **Description** |
| --- | :----------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | --- | :----------: | ------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1 | stream_name | BINARY(64) | Stream name | | 1 | stream_name | VARCHAR(64) | Stream name |
| 2 | create_time | TIMESTAMP | Creation time | | 2 | create_time | TIMESTAMP | Creation time |
| 3 | sql | BINARY(1024) | SQL statement used to create the stream | | 3 | sql | VARCHAR(1024) | SQL statement used to create the stream |
| 4 | status | BINARY(20) | Current status | | 4 | status | VARCHAR(20) | Current status |
| 5 | source_db | BINARY(64) | Source database | | 5 | source_db | VARCHAR(64) | Source database |
| 6 | target_db | BINARY(64) | Target database | | 6 | target_db | VARCHAR(64) | Target database |
| 7 | target_table | BINARY(192) | Target table | | 7 | target_table | VARCHAR(192) | Target table |
| 8 | watermark | BIGINT | Watermark (see stream processing documentation). It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 8 | watermark | BIGINT | Watermark (see stream processing documentation). It should be noted that `watermark` is a TDengine keyword and needs to be escaped with ` when used as a column name. |
| 9 | trigger | INT | Method of triggering the result push (see stream processing documentation). It should be noted that `trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. | | 9 | trigger | INT | Method of triggering the result push (see stream processing documentation). It should be noted that `trigger` is a TDengine keyword and needs to be escaped with ` when used as a column name. |

View File

@ -57,9 +57,20 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------ | | --- | :---------: | ------------ | ------------ |
| 1 | id | SMALLINT | qnode id | | 1 | id | SMALLINT | qnode id |
| 2 | endpoint | BINARY(134) | qnode 的地址 | | 2 | endpoint | VARCHAR(134) | qnode 的地址 |
| 3 | create_time | TIMESTAMP | 创建时间 | | 3 | create_time | TIMESTAMP | 创建时间 |
## INS_SNODES
当前系统中 SNODE 的信息。也可以使用 SHOW SNODES 来查询这些信息。SYSINFO 属性为 0 的用户不能查看此表。
| # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------ |
| 1 | id | SMALLINT | snode id |
| 2 | endpoint | VARCHAR(134) | snode 的地址 |
| 3 | create_time | TIMESTAMP | 创建时间 |
## INS_CLUSTER ## INS_CLUSTER
存储集群相关信息。 SYSINFO 属性为 0 的用户不能查看此表。 存储集群相关信息。 SYSINFO 属性为 0 的用户不能查看此表。
@ -67,7 +78,7 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ---------- | | --- | :---------: | ------------ | ---------- |
| 1 | id | BIGINT | cluster id | | 1 | id | BIGINT | cluster id |
| 2 | name | BINARY(134) | 集群名称 | | 2 | name | VARCHAR(134) | 集群名称 |
| 3 | create_time | TIMESTAMP | 创建时间 | | 3 | create_time | TIMESTAMP | 创建时间 |
## INS_DATABASES ## INS_DATABASES
@ -111,15 +122,15 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------- | --------------------------------------------------------------------------------------------- | | --- | :-----------: | ------------- | --------------------------------------------------------------------------------------------- |
| 1 | name | BINARY(64) | 函数名 | | 1 | name | VARCHAR(64) | 函数名 |
| 2 | comment | BINARY(255) | 补充说明。需要注意,`comment` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 2 | comment | VARCHAR(255) | 补充说明。需要注意,`comment` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 3 | aggregate | INT | 是否为聚合函数。需要注意,`aggregate` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 3 | aggregate | INT | 是否为聚合函数。需要注意,`aggregate` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 4 | output_type | BINARY(31) | 输出类型 | | 4 | output_type | VARCHAR(31) | 输出类型 |
| 5 | create_time | TIMESTAMP | 创建时间 | | 5 | create_time | TIMESTAMP | 创建时间 |
| 6 | code_len | INT | 代码长度 | | 6 | code_len | INT | 代码长度 |
| 7 | bufsize | INT | buffer 大小 | | 7 | bufsize | INT | buffer 大小 |
| 8 | func_language | BINARY(31) | 自定义函数编程语言 | | 8 | func_language | VARCHAR(31) | 自定义函数编程语言 |
| 9 | func_body | BINARY(16384) | 函数体定义 | | 9 | func_body | VARCHAR(16384) | 函数体定义 |
| 10 | func_version | INT | 函数版本号。初始版本为0每次替换更新版本号加1。 | | 10 | func_version | INT | 函数版本号。初始版本为0每次替换更新版本号加1。 |
@ -129,12 +140,12 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :--------------: | ------------ | ------------------------------------------------------- | | --- | :--------------: | ------------ | ------------------------------------------------------- |
| 1 | db_name | BINARY(32) | 包含此索引的表所在的数据库名 | | 1 | db_name | VARCHAR(32) | 包含此索引的表所在的数据库名 |
| 2 | table_name | BINARY(192) | 包含此索引的表的名称 | | 2 | table_name | VARCHAR(192) | 包含此索引的表的名称 |
| 3 | index_name | BINARY(192) | 索引名 | | 3 | index_name | VARCHAR(192) | 索引名 |
| 4 | column_name | BINARY(64) | 建索引的列的列名 | | 4 | column_name | VARCHAR(64) | 建索引的列的列名 |
| 5 | index_type | BINARY(10) | 目前有 SMA 和 tag | | 5 | index_type | VARCHAR(10) | 目前有 SMA 和 tag |
| 6 | index_extensions | BINARY(256) | 索引的额外信息。对 SMA/tag 类型的索引,是函数名的列表。 | | 6 | index_extensions | VARCHAR(256) | 索引的额外信息。对 SMA/tag 类型的索引,是函数名的列表。 |
## INS_STABLES ## INS_STABLES
@ -142,16 +153,16 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ----------------------------------------------------------------------------------------------------- | | --- | :-----------: | ------------ | ----------------------------------------------------------------------------------------------------- |
| 1 | stable_name | BINARY(192) | 超级表表名 | | 1 | stable_name | VARCHAR(192) | 超级表表名 |
| 2 | db_name | BINARY(64) | 超级表所在的数据库的名称 | | 2 | db_name | VARCHAR(64) | 超级表所在的数据库的名称 |
| 3 | create_time | TIMESTAMP | 创建时间 | | 3 | create_time | TIMESTAMP | 创建时间 |
| 4 | columns | INT | 列数目 | | 4 | columns | INT | 列数目 |
| 5 | tags | INT | 标签数目。需要注意,`tags` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 5 | tags | INT | 标签数目。需要注意,`tags` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 6 | last_update | TIMESTAMP | 最后更新时间 | | 6 | last_update | TIMESTAMP | 最后更新时间 |
| 7 | table_comment | BINARY(1024) | 表注释 | | 7 | table_comment | VARCHAR(1024) | 表注释 |
| 8 | watermark | BINARY(64) | 窗口的关闭时间。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 8 | watermark | VARCHAR(64) | 窗口的关闭时间。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 9 | max_delay | BINARY(64) | 推送计算结果的最大延迟。需要注意,`max_delay` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 9 | max_delay | VARCHAR(64) | 推送计算结果的最大延迟。需要注意,`max_delay` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 10 | rollup | BINARY(128) | rollup 聚合函数。需要注意,`rollup` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 10 | rollup | VARCHAR(128) | rollup 聚合函数。需要注意,`rollup` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
## INS_TABLES ## INS_TABLES
@ -159,37 +170,37 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ------------------------------------------------------------------------------------- | | --- | :-----------: | ------------ | ------------------------------------------------------------------------------------- |
| 1 | table_name | BINARY(192) | 表名 | | 1 | table_name | VARCHAR(192) | 表名 |
| 2 | db_name | BINARY(64) | 数据库名 | | 2 | db_name | VARCHAR(64) | 数据库名 |
| 3 | create_time | TIMESTAMP | 创建时间 | | 3 | create_time | TIMESTAMP | 创建时间 |
| 4 | columns | INT | 列数目 | | 4 | columns | INT | 列数目 |
| 5 | stable_name | BINARY(192) | 所属的超级表表名 | | 5 | stable_name | VARCHAR(192) | 所属的超级表表名 |
| 6 | uid | BIGINT | 表 id | | 6 | uid | BIGINT | 表 id |
| 7 | vgroup_id | INT | vgroup id | | 7 | vgroup_id | INT | vgroup id |
| 8 | ttl | INT | 表的生命周期。需要注意,`ttl` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 8 | ttl | INT | 表的生命周期。需要注意,`ttl` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 9 | table_comment | BINARY(1024) | 表注释 | | 9 | table_comment | VARCHAR(1024) | 表注释 |
| 10 | type | BINARY(21) | 表类型 | | 10 | type | VARCHAR(21) | 表类型 |
## INS_TAGS ## INS_TAGS
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------- | ---------------------- | | --- | :---------: | ------------- | ---------------------- |
| 1 | table_name | BINARY(192) | 表名 | | 1 | table_name | VARCHAR(192) | 表名 |
| 2 | db_name | BINARY(64) | 该表所在的数据库的名称 | | 2 | db_name | VARCHAR(64) | 该表所在的数据库的名称 |
| 3 | stable_name | BINARY(192) | 所属的超级表表名 | | 3 | stable_name | VARCHAR(192) | 所属的超级表表名 |
| 4 | tag_name | BINARY(64) | tag 的名称 | | 4 | tag_name | VARCHAR(64) | tag 的名称 |
| 5 | tag_type | BINARY(64) | tag 的类型 | | 5 | tag_type | VARCHAR(64) | tag 的类型 |
| 6 | tag_value | BINARY(16384) | tag 的值 | | 6 | tag_value | VARCHAR(16384) | tag 的值 |
## INS_COLUMNS ## INS_COLUMNS
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :-----------: | ------------ | ---------------------- | | --- | :-----------: | ------------ | ---------------------- |
| 1 | table_name | BINARY(192) | 表名 | | 1 | table_name | VARCHAR(192) | 表名 |
| 2 | db_name | BINARY(64) | 该表所在的数据库的名称 | | 2 | db_name | VARCHAR(64) | 该表所在的数据库的名称 |
| 3 | table_type | BINARY(21) | 表类型 | | 3 | table_type | VARCHAR(21) | 表类型 |
| 4 | col_name | BINARY(64) | 列 的名称 | | 4 | col_name | VARCHAR(64) | 列 的名称 |
| 5 | col_type | BINARY(32) | 列 的类型 | | 5 | col_type | VARCHAR(32) | 列 的类型 |
| 6 | col_length | INT | 列 的长度 | | 6 | col_length | INT | 列 的长度 |
| 7 | col_precision | INT | 列 的精度 | | 7 | col_precision | INT | 列 的精度 |
| 8 | col_scale | INT | 列 的比例 | | 8 | col_scale | INT | 列 的比例 |
@ -201,8 +212,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | -------- | | --- | :---------: | ------------ | -------- |
| 1 | user_name | BINARY(23) | 用户名 | | 1 | user_name | VARCHAR(23) | 用户名 |
| 2 | privilege | BINARY(256) | 权限 | | 2 | privilege | VARCHAR(256) | 权限 |
| 3 | create_time | TIMESTAMP | 创建时间 | | 3 | create_time | TIMESTAMP | 创建时间 |
## INS_GRANTS ## INS_GRANTS
@ -211,20 +222,20 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | --------------------------------------------------------------------------------------------------------- | | --- | :---------: | ------------ | --------------------------------------------------------------------------------------------------------- |
| 1 | version | BINARY(9) | 企业版授权说明official(官方授权的)/trial(试用的) | | 1 | version | VARCHAR(9) | 企业版授权说明official(官方授权的)/trial(试用的) |
| 2 | cpu_cores | BINARY(9) | 授权使用的 CPU 核心数量 | | 2 | cpu_cores | VARCHAR(9) | 授权使用的 CPU 核心数量 |
| 3 | dnodes | BINARY(10) | 授权使用的 dnode 节点数量。需要注意,`dnodes` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 3 | dnodes | VARCHAR(10) | 授权使用的 dnode 节点数量。需要注意,`dnodes` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 4 | streams | BINARY(10) | 授权创建的流数量。需要注意,`streams` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 4 | streams | VARCHAR(10) | 授权创建的流数量。需要注意,`streams` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 5 | users | BINARY(10) | 授权创建的用户数量。需要注意,`users` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 5 | users | VARCHAR(10) | 授权创建的用户数量。需要注意,`users` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 6 | accounts | BINARY(10) | 授权创建的帐户数量。需要注意,`accounts` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 6 | accounts | VARCHAR(10) | 授权创建的帐户数量。需要注意,`accounts` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 7 | storage | BINARY(21) | 授权使用的存储空间大小。需要注意,`storage` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 7 | storage | VARCHAR(21) | 授权使用的存储空间大小。需要注意,`storage` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 8 | connections | BINARY(21) | 授权使用的客户端连接数量。需要注意,`connections` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 8 | connections | VARCHAR(21) | 授权使用的客户端连接数量。需要注意,`connections` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 9 | databases | BINARY(11) | 授权使用的数据库数量。需要注意,`databases` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 9 | databases | VARCHAR(11) | 授权使用的数据库数量。需要注意,`databases` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 10 | speed | BINARY(9) | 授权使用的数据点每秒写入数量 | | 10 | speed | VARCHAR(9) | 授权使用的数据点每秒写入数量 |
| 11 | querytime | BINARY(9) | 授权使用的查询总时长 | | 11 | querytime | VARCHAR(9) | 授权使用的查询总时长 |
| 12 | timeseries | BINARY(21) | 授权使用的测点数量 | | 12 | timeseries | VARCHAR(21) | 授权使用的测点数量 |
| 13 | expired | BINARY(5) | 是否到期true到期false未到期 | | 13 | expired | VARCHAR(5) | 是否到期true到期false未到期 |
| 14 | expire_time | BINARY(19) | 试用期到期时间 | | 14 | expire_time | VARCHAR(19) | 试用期到期时间 |
## INS_VGROUPS ## INS_VGROUPS
@ -233,15 +244,15 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :-------: | ------------ | ------------------------------------------------------------------------------------------------ | | --- | :-------: | ------------ | ------------------------------------------------------------------------------------------------ |
| 1 | vgroup_id | INT | vgroup id | | 1 | vgroup_id | INT | vgroup id |
| 2 | db_name | BINARY(32) | 数据库名 | | 2 | db_name | VARCHAR(32) | 数据库名 |
| 3 | tables | INT | 此 vgroup 内有多少表。需要注意,`tables` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 3 | tables | INT | 此 vgroup 内有多少表。需要注意,`tables` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 4 | status | BINARY(10) | 此 vgroup 的状态 | | 4 | status | VARCHAR(10) | 此 vgroup 的状态 |
| 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id | | 5 | v1_dnode | INT | 第一个成员所在的 dnode 的 id |
| 6 | v1_status | BINARY(10) | 第一个成员的状态 | | 6 | v1_status | VARCHAR(10) | 第一个成员的状态 |
| 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id | | 7 | v2_dnode | INT | 第二个成员所在的 dnode 的 id |
| 8 | v2_status | BINARY(10) | 第二个成员的状态 | | 8 | v2_status | VARCHAR(10) | 第二个成员的状态 |
| 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id | | 9 | v3_dnode | INT | 第三个成员所在的 dnode 的 id |
| 10 | v3_status | BINARY(10) | 第三个成员的状态 | | 10 | v3_status | VARCHAR(10) | 第三个成员的状态 |
| 11 | nfiles | INT | 此 vgroup 中数据/元数据文件的数量 | | 11 | nfiles | INT | 此 vgroup 中数据/元数据文件的数量 |
| 12 | file_size | INT | 此 vgroup 中数据/元数据文件的大小 | | 12 | file_size | INT | 此 vgroup 中数据/元数据文件的大小 |
| 13 | tsma | TINYINT | 此 vgroup 是否专用于 Time-range-wise SMA1: 是, 0: 否 | | 13 | tsma | TINYINT | 此 vgroup 是否专用于 Time-range-wise SMA1: 是, 0: 否 |
@ -252,8 +263,8 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :------: | ------------ | --------------------------------------------------------------------------------------- | | --- | :------: | ------------ | --------------------------------------------------------------------------------------- |
| 1 | name | BINARY(32) | 配置项名称 | | 1 | name | VARCHAR(32) | 配置项名称 |
| 2 | value | BINARY(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 2 | value | VARCHAR(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
## INS_DNODE_VARIABLES ## INS_DNODE_VARIABLES
@ -262,40 +273,40 @@ TDengine 内置了一个名为 `INFORMATION_SCHEMA` 的数据库,提供对数
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :------: | ------------ | --------------------------------------------------------------------------------------- | | --- | :------: | ------------ | --------------------------------------------------------------------------------------- |
| 1 | dnode_id | INT | dnode 的 ID | | 1 | dnode_id | INT | dnode 的 ID |
| 2 | name | BINARY(32) | 配置项名称 | | 2 | name | VARCHAR(32) | 配置项名称 |
| 3 | value | BINARY(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 3 | value | VARCHAR(64) | 该配置项的值。需要注意,`value` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
## INS_TOPICS ## INS_TOPICS
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :---------: | ------------ | ------------------------------ | | --- | :---------: | ------------ | ------------------------------ |
| 1 | topic_name | BINARY(192) | topic 名称 | | 1 | topic_name | VARCHAR(192) | topic 名称 |
| 2 | db_name | BINARY(64) | topic 相关的 DB | | 2 | db_name | VARCHAR(64) | topic 相关的 DB |
| 3 | create_time | TIMESTAMP | topic 的 创建时间 | | 3 | create_time | TIMESTAMP | topic 的 创建时间 |
| 4 | sql | BINARY(1024) | 创建该 topic 时所用的 SQL 语句 | | 4 | sql | VARCHAR(1024) | 创建该 topic 时所用的 SQL 语句 |
## INS_SUBSCRIPTIONS ## INS_SUBSCRIPTIONS
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :------------: | ------------ | ------------------------ | | --- | :------------: | ------------ | ------------------------ |
| 1 | topic_name | BINARY(204) | 被订阅的 topic | | 1 | topic_name | VARCHAR(204) | 被订阅的 topic |
| 2 | consumer_group | BINARY(193) | 订阅者的消费者组 | | 2 | consumer_group | VARCHAR(193) | 订阅者的消费者组 |
| 3 | vgroup_id | INT | 消费者被分配的 vgroup id | | 3 | vgroup_id | INT | 消费者被分配的 vgroup id |
| 4 | consumer_id | BIGINT | 消费者的唯一 id | | 4 | consumer_id | BIGINT | 消费者的唯一 id |
| 5 | offset | BINARY(64) | 消费者的消费进度 | | 5 | offset | VARCHAR(64) | 消费者的消费进度 |
| 6 | rows | BIGINT | 消费者的消费的数据条数 | | 6 | rows | BIGINT | 消费者的消费的数据条数 |
## INS_STREAMS ## INS_STREAMS
| # | **列名** | **数据类型** | **说明** | | # | **列名** | **数据类型** | **说明** |
| --- | :----------: | ------------ | -------------------------------------------------------------------------------------------------------------------- | | --- | :----------: | ------------ | -------------------------------------------------------------------------------------------------------------------- |
| 1 | stream_name | BINARY(64) | 流计算名称 | | 1 | stream_name | VARCHAR(64) | 流计算名称 |
| 2 | create_time | TIMESTAMP | 创建时间 | | 2 | create_time | TIMESTAMP | 创建时间 |
| 3 | sql | BINARY(1024) | 创建流计算时提供的 SQL 语句 | | 3 | sql | VARCHAR(1024) | 创建流计算时提供的 SQL 语句 |
| 4 | status | BINARY(20) | 流当前状态 | | 4 | status | VARCHAR(20) | 流当前状态 |
| 5 | source_db | BINARY(64) | 源数据库 | | 5 | source_db | VARCHAR(64) | 源数据库 |
| 6 | target_db | BINARY(64) | 目的数据库 | | 6 | target_db | VARCHAR(64) | 目的数据库 |
| 7 | target_table | BINARY(192) | 流计算写入的目标表 | | 7 | target_table | VARCHAR(192) | 流计算写入的目标表 |
| 8 | watermark | BIGINT | watermark详见 SQL 手册流式计算。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 8 | watermark | BIGINT | watermark详见 SQL 手册流式计算。需要注意,`watermark` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |
| 9 | trigger | INT | 计算结果推送模式,详见 SQL 手册流式计算。需要注意,`trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 | | 9 | trigger | INT | 计算结果推送模式,详见 SQL 手册流式计算。需要注意,`trigger` 为 TDengine 关键字,作为列名使用时需要使用 ` 进行转义。 |

View File

@ -108,7 +108,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData);
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow); int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);
int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
void tRowDestroy(SRow *pRow); void tRowDestroy(SRow *pRow);
void tRowSort(SArray *aRowP); int32_t tRowSort(SArray *aRowP);
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag);
int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag);

View File

@ -712,7 +712,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
int8_t isSucceed); int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,

View File

@ -54,6 +54,17 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void
*/ */
void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn); void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn);
/**
* merge sort, with the compare function requiring additional parameters support
*
* @param src
* @param numOfElem
* @param size
* @param comparFn
* @return int32_t 0 for success, other for failure.
*/
int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn);
/** /**
* binary search, with range support * binary search, with range support
* *

View File

@ -214,12 +214,19 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp);
void taosArraySwap(SArray* a, SArray* b); void taosArraySwap(SArray* a, SArray* b);
/** /**
* sort the array * sort the array use qsort
* @param pArray * @param pArray
* @param compar * @param compar
*/ */
void taosArraySort(SArray* pArray, __compar_fn_t comparFn); void taosArraySort(SArray* pArray, __compar_fn_t comparFn);
/**
* sort the array use merge sort
* @param pArray
* @param compar
*/
int32_t taosArrayMSort(SArray* pArray, __compar_fn_t comparFn);
/** /**
* search the array * search the array
* @param pArray * @param pArray

View File

@ -610,9 +610,13 @@ _exit:
return code; return code;
} }
void tRowSort(SArray *aRowP) { int32_t tRowSort(SArray *aRowP) {
if (TARRAY_SIZE(aRowP) <= 1) return; if (TARRAY_SIZE(aRowP) <= 1) return 0;
taosArraySort(aRowP, tRowPCmprFn); int32_t code = taosArrayMSort(aRowP, tRowPCmprFn);
if (code != TSDB_CODE_SUCCESS) {
uError("taosArrayMSort failed caused by %d", code);
}
return code;
} }
int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) { int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag) {

View File

@ -401,7 +401,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){ if(pSub == NULL){
#ifdef TMQ_DEBUG
ASSERT(0); ASSERT(0);
#endif
continue; continue;
} }
taosWLockLatch(&pSub->lock); taosWLockLatch(&pSub->lock);
@ -499,7 +501,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created // txn guarantees pSub is created
if(pSub == NULL) { if(pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0); ASSERT(0);
#endif
continue; continue;
} }
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
@ -510,7 +514,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema // 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if(pTopic == NULL) { if(pTopic == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0); ASSERT(0);
#endif
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
continue; continue;
@ -649,7 +655,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} }
// check topic existence // check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) { if (pTrans == NULL) {
goto _over; goto _over;
} }

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndStb.h" #include "mndStb.h"
#include "audit.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndIndex.h" #include "mndIndex.h"
@ -31,7 +32,6 @@
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "tname.h" #include "tname.h"
#include "audit.h"
#define STB_VER_NUMBER 1 #define STB_VER_NUMBER 1
#define STB_RESERVE_SIZE 64 #define STB_RESERVE_SIZE 64
@ -858,6 +858,23 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
} }
return 0; return 0;
} }
static int32_t mndGenIdxNameForFirstTag(char *fullname, char *dbname, char *tagname) {
char randStr[TSDB_COL_NAME_LEN] = {0};
int32_t left = TSDB_COL_NAME_LEN - strlen(tagname) - 1;
if (left <= 1) {
sprintf(fullname, "%s.%s", dbname, tagname);
} else {
int8_t start = left < 8 ? 0 : 8;
int8_t end = left >= 24 ? 24 : left - 1;
// gen rand str len [base:end]
// note: ignore rand performance issues
int64_t len = taosRand() % (end - start + 1) + start;
taosRandStr2(randStr, len);
sprintf(fullname, "%s.%s_%s", dbname, tagname, randStr);
}
return 0;
}
static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) { static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
SStbObj stbObj = {0}; SStbObj stbObj = {0};
@ -871,10 +888,8 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name); mInfo("trans:%d, used to create stb:%s", pTrans->id, pCreate->name);
if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER; if (mndBuildStbFromReq(pMnode, &stbObj, pCreate, pDb) != 0) goto _OVER;
char randStr[24] = {0};
taosRandStr2(randStr, tListLen(randStr) - 1);
SSchema *pSchema = &(stbObj.pTags[0]); SSchema *pSchema = &(stbObj.pTags[0]);
sprintf(fullIdxName, "%s.%s_%s", pDb->name, pSchema->name, randStr); mndGenIdxNameForFirstTag(fullIdxName, pDb->name, pSchema->name);
SSIdx idx = {0}; SSIdx idx = {0};
if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) { if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) {
@ -3273,7 +3288,7 @@ static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb
return p->info.rows; return p->info.rows;
} }
static int8_t determineBuildColForWhichDBs(const char* db) { static int8_t determineBuildColForWhichDBs(const char *db) {
int8_t buildWhichDBs; int8_t buildWhichDBs;
if (!db[0]) if (!db[0])
buildWhichDBs = BUILD_COL_FOR_ALL_DB; buildWhichDBs = BUILD_COL_FOR_ALL_DB;
@ -3291,11 +3306,11 @@ static int8_t determineBuildColForWhichDBs(const char* db) {
} }
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
uint8_t buildWhichDBs; uint8_t buildWhichDBs;
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
buildWhichDBs = determineBuildColForWhichDBs(pShow->db); buildWhichDBs = determineBuildColForWhichDBs(pShow->db);

View File

@ -875,7 +875,11 @@ end:
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; if (code != 0) {
mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic);
return code;
}
return TSDB_CODE_ACTION_IN_PROGRESS;
} }
void mndCleanupSubscribe(SMnode *pMnode) {} void mndCleanupSubscribe(SMnode *pMnode) {}

View File

@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
} }
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
code = 0; code = 0;
taosWUnLockLatch(&pTq->lock);
goto end; goto end;
} }
// 2. check consumer-vg assignment status // 2. check consumer-vg assignment status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != req.consumerId) { if (pHandle->consumerId != req.consumerId) {
tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
req.consumerId, vgId, req.subKey, pHandle->consumerId); req.consumerId, vgId, req.subKey, pHandle->consumerId);
taosRUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
goto end; goto end;
} }
@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
taosRUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
end: end:
rsp.code = code; rsp.code = code;
@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
// 1. find handle // 1. find handle
taosRLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey); tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
taosRUnLockLatch(&pTq->lock);
return -1; return -1;
} }
// 2. check re-balance status // 2. check re-balance status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, vgId, req.subKey, pHandle->consumerId); consumerId, vgId, req.subKey, pHandle->consumerId);
@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
bool exec = tqIsHandleExec(pHandle); bool exec = tqIsHandleExec(pHandle);
if(exec){ if(exec){
tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
pHandle->subKey, pHandle); pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
taosMsleep(10); taosMsleep(10);
@ -667,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle* pHandle = NULL; STqHandle* pHandle = NULL;
while (1) { while (1) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) { if (pHandle) {
break;
}
taosRLockLatch(&pTq->lock);
ret = tqMetaGetHandle(pTq, req.subKey);
taosRUnLockLatch(&pTq->lock);
if (ret < 0) {
break; break;
} }
} }
@ -687,21 +697,33 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDestroyTqHandle(&handle); tqDestroyTqHandle(&handle);
goto end; goto end;
} }
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
} else {
taosWLockLatch(&pTq->lock); taosWLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} else {
while(1){
taosWLockLatch(&pTq->lock);
bool exec = tqIsHandleExec(pHandle);
if(exec){
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
taosMsleep(10);
continue;
}
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}
taosWUnLockLatch(&pTq->lock);
break;
}
} }
end: end:
@ -1086,8 +1108,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task set status to be dropping", id); tqDebug("s-task:%s fill-history task set status to be dropping", id);
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
streamBuildAndSendDropTaskMsg(pTask, pMeta->vgId, &pTask->id);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return -1; return -1;
} }

View File

@ -367,7 +367,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
bool tqNextBlockInWal(STqReader* pReader, const char* id) { bool tqNextBlockInWal(STqReader* pReader, const char* id) {
SWalReader* pWalReader = pReader->pWalReader; SWalReader* pWalReader = pReader->pWalReader;
// uint64_t st = taosGetTimestampMs(); uint64_t st = taosGetTimestampMs();
while (1) { while (1) {
SArray* pBlockList = pReader->submit.aSubmitTbData; SArray* pBlockList = pReader->submit.aSubmitTbData;
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
@ -442,9 +442,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
pReader->msg.msgStr = NULL; pReader->msg.msgStr = NULL;
// if(taosGetTimestampMs() - st > 5){ if(taosGetTimestampMs() - st > 1000){
// return false; return false;
// } }
} }
} }
@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle // update the table list for each consumer handle
taosWLockLatch(&pTq->lock);
while (1) { while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter); pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) { if (pIter == NULL) {
@ -1116,6 +1117,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list); taosArrayDestroy(list);
taosHashCancelIterate(pTq->pHandle, pIter); taosHashCancelIterate(pTq->pHandle, pIter);
taosWUnLockLatch(&pTq->lock);
return ret; return ret;
} }
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL); tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
@ -1125,6 +1128,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
} }
} }
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader // update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);

View File

@ -200,7 +200,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle); code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err; if (code) goto _err;
taosWLockLatch(&pTq->lock);
code = tqMetaSaveHandle(pTq, handle.subKey, &handle); code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
taosWUnLockLatch(&pTq->lock);
if (code < 0) goto _err; if (code < 0) goto _err;
tDecoderClear(pDecoder); tDecoderClear(pDecoder);

View File

@ -218,7 +218,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
walReaderVerifyOffset(pHandle->pWalReader, offset); walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version; int64_t fetchVer = offset->version;
// uint64_t st = taosGetTimestampMs(); uint64_t st = taosGetTimestampMs();
int totalRows = 0; int totalRows = 0;
while (1) { while (1) {
// int32_t savedEpoch = atomic_load_32(&pHandle->epoch); // int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
@ -265,8 +265,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end; goto end;
} }
// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) {
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end; goto end;

View File

@ -1045,6 +1045,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
writer[0]->precision = pTsdb->keepCfg.precision; writer[0]->precision = pTsdb->keepCfg.precision;
writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; writer[0]->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; writer[0]->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
writer[0]->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS); writer[0]->commitID = tsdbFSAllocEid(pTsdb->pFS);
writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize; writer[0]->szPage = pTsdb->pVnode->config.tsdbPageSize;
writer[0]->compactVersion = INT64_MAX; writer[0]->compactVersion = INT64_MAX;

View File

@ -289,8 +289,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
} }
if (disorderTs) { if (disorderTs) {
tRowSort(tbData.aRowP); if ((tRowSort(tbData.aRowP) != TSDB_CODE_SUCCESS) ||
if ((terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) { (terrno = tRowMerge(tbData.aRowP, (STSchema*)pTSchema, 0)) != 0) {
goto _end; goto _end;
} }
} }

View File

@ -495,9 +495,9 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
tColDataSortMerge(pTableCxt->pData->aCol); tColDataSortMerge(pTableCxt->pData->aCol);
} else { } else {
if (!pTableCxt->ordered) { if (!pTableCxt->ordered) {
tRowSort(pTableCxt->pData->aRowP); code = tRowSort(pTableCxt->pData->aRowP);
} }
if (!pTableCxt->ordered || pTableCxt->duplicateTs) { if (code == TSDB_CODE_SUCCESS && (!pTableCxt->ordered || pTableCxt->duplicateTs)) {
code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0); code = tRowMerge(pTableCxt->pData->aRowP, pTableCxt->pSchema, 0);
} }
} }

View File

@ -990,6 +990,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
} }
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -303,7 +303,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId); pTask->id.idStr, (int32_t) pTask->streamTaskId.taskId);
// 1. free it and remove fill-history task from disk meta-store // 1. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 2. save to disk // 2. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
@ -365,8 +365,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// 4. free it and remove fill-history task from disk meta-store // 4. free it and remove fill-history task from disk meta-store
// streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
streamBuildAndSendDropTaskMsg(pStreamTask, pMeta->vgId, &pTask->id);
// 5. clear the link between fill-history task and stream task info // 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->historyTaskId.taskId = 0;
@ -411,7 +410,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task } else { // drop fill-history task
streamBuildAndSendDropTaskMsg(pTask, pTask->pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id);
} }
return code; return code;

View File

@ -644,7 +644,7 @@ int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
return status; return status;
} }
int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamTaskId* pTaskId) { int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -656,7 +656,7 @@ int32_t streamBuildAndSendDropTaskMsg(SStreamTask* pTask, int32_t vgId, SStreamT
pReq->streamId = pTaskId->streamId; pReq->streamId = pTaskId->streamId;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
int32_t code = tmsgPutToQueue(pTask->pMsgCb, WRITE_QUEUE, &msg); int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); qError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code; return code;

View File

@ -86,8 +86,8 @@ void taosRandStr(char* str, int32_t size) {
} }
void taosRandStr2(char* str, int32_t size) { void taosRandStr2(char* str, int32_t size) {
const char* set = "abcdefghijklmnopqrstuvwxyz0123456789"; const char* set = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ@";
int32_t len = 36; int32_t len = strlen(set);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
str[i] = set[taosRand() % len]; str[i] = set[taosRand() % len];

View File

@ -273,3 +273,86 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar,
taosMemoryFree(buf); taosMemoryFree(buf);
} }
static void taosMerge(void *src, int32_t start, int32_t leftend, int32_t end, int64_t size, const void *param,
__ext_compar_fn_t comparFn, void *tmp) {
int32_t leftSize = leftend - start + 1;
int32_t rightSize = end - leftend;
void *leftBuf = tmp;
void *rightBuf = (char *)tmp + (leftSize * size);
memcpy(leftBuf, elePtrAt(src, size, start), leftSize * size);
memcpy(rightBuf, elePtrAt(src, size, leftend + 1), rightSize * size);
int32_t i = 0, j = 0, k = start;
while (i < leftSize && j < rightSize) {
int32_t ret = comparFn(elePtrAt(leftBuf, size, i), elePtrAt(rightBuf, size, j), param);
if (ret <= 0) {
memcpy(elePtrAt(src, size, k), elePtrAt(leftBuf, size, i), size);
i++;
} else {
memcpy(elePtrAt(src, size, k), elePtrAt(rightBuf, size, j), size);
j++;
}
k++;
}
while (i < leftSize) {
memcpy(elePtrAt(src, size, k), elePtrAt(leftBuf, size, i), size);
i++;
k++;
}
while (j < rightSize) {
memcpy(elePtrAt(src, size, k), elePtrAt(rightBuf, size, j), size);
j++;
k++;
}
}
static int32_t taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, const void *param,
__ext_compar_fn_t comparFn) {
// short array sort, instead of merge sort process
const int32_t THRESHOLD_SIZE = 6;
char *buf = taosMemoryCalloc(1, size); // prepare the swap buffer
if (buf == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (int32_t start = 0; start < numOfElem - 1; start += THRESHOLD_SIZE) {
int32_t end = (start + THRESHOLD_SIZE - 1) <= numOfElem - 1 ? (start + THRESHOLD_SIZE - 1) : numOfElem - 1;
tInsertSort(src, size, start, end, param, comparFn, buf);
}
taosMemoryFreeClear(buf);
if (numOfElem > THRESHOLD_SIZE) {
int32_t currSize;
void *tmp = taosMemoryMalloc(numOfElem * size);
if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY;
for (currSize = THRESHOLD_SIZE; currSize <= numOfElem - 1; currSize = 2 * currSize) {
int32_t leftStart;
for (leftStart = 0; leftStart < numOfElem - 1; leftStart += 2 * currSize) {
int32_t leftend = leftStart + currSize - 1;
int32_t rightEnd =
(leftStart + 2 * currSize - 1 < numOfElem - 1) ? (leftStart + 2 * currSize - 1) : (numOfElem - 1);
if (leftend >= rightEnd) break;
taosMerge(src, leftStart, leftend, rightEnd, size, param, comparFn, tmp);
}
}
taosMemoryFreeClear(tmp);
}
return 0;
}
int32_t msortHelper(const void *p1, const void *p2, const void *param) {
__compar_fn_t comparFn = param;
return comparFn(p1, p2);
}
int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) {
void *param = comparFn;
return taosMergeSortHelper(src, numOfElem, size, param, msortHelper);
}

View File

@ -417,6 +417,10 @@ void taosArraySort(SArray* pArray, __compar_fn_t compar) {
taosSort(pArray->pData, pArray->size, pArray->elemSize, compar); taosSort(pArray->pData, pArray->size, pArray->elemSize, compar);
} }
int32_t taosArrayMSort(SArray* pArray, __compar_fn_t compar) {
return taosMergeSort(pArray->pData, pArray->size, pArray->elemSize, compar);
}
void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) {
return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags); return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
} }

View File

@ -84,3 +84,11 @@ add_test(
NAME pageBufferTest NAME pageBufferTest
COMMAND pageBufferTest COMMAND pageBufferTest
) )
# talgoTest
add_executable(talgoTest "talgoTest.cpp")
target_link_libraries(talgoTest os util gtest_main)
add_test(
NAME talgoTest
COMMAND talgoTest
)

View File

@ -0,0 +1,104 @@
#include <gtest/gtest.h>
#include <stdlib.h>
#include "talgo.h"
struct TestStruct {
int a;
float b;
};
// Define a custom comparison function for testing
int cmpFunc(const void* a, const void* b) {
const TestStruct* pa = reinterpret_cast<const TestStruct*>(a);
const TestStruct* pb = reinterpret_cast<const TestStruct*>(b);
if (pa->a < pb->a) {
return -1;
} else if (pa->a > pb->a) {
return 1;
} else {
return 0;
}
}
TEST(utilTest, taosMSort) {
// Create an array of test data
TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}};
// Sort the array using taosSort
taosMergeSort(arr, 6, sizeof(TestStruct), cmpFunc);
for (int i = 0; i < sizeof(arr) / sizeof(TestStruct); i++) {
printf("%d: %d %f\n", i, arr[i].a, arr[i].b);
}
// Check that the array is sorted correctly
EXPECT_EQ(arr[0].a, 1);
EXPECT_EQ(arr[1].a, 2);
EXPECT_EQ(arr[2].a, 3);
EXPECT_EQ(arr[2].b, 6);
EXPECT_EQ(arr[3].a, 3);
EXPECT_EQ(arr[3].b, 2);
EXPECT_EQ(arr[4].a, 3);
EXPECT_EQ(arr[4].b, 5);
EXPECT_EQ(arr[5].a, 4);
}
int cmpInt(const void* a, const void* b) {
int int_a = *((int*)a);
int int_b = *((int*)b);
if (int_a == int_b)
return 0;
else if (int_a < int_b)
return -1;
else
return 1;
}
TEST(utilTest, taosMSort2) {
clock_t start_time, end_time;
double cpu_time_used;
int times = 10000;
start_time = clock();
for (int i = 0; i < 10000; i++) {
TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}};
taosMergeSort(arr, 6, sizeof(TestStruct), cmpFunc);
}
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosMSort %d times: %f s\n", times, cpu_time_used);
start_time = clock();
for (int i = 0; i < 10000; i++) {
TestStruct arr[] = {{4, 2.5}, {3, 6}, {2, 1.5}, {3, 2}, {1, 3.5}, {3, 5}};
taosSort(arr, 6, sizeof(TestStruct), cmpFunc);
}
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosSort %d times: %f s\n", times, cpu_time_used);
const int arraySize = 1000000;
int data1[arraySize];
int data2[arraySize];
for (int i = 0; i < arraySize; ++i) {
data1[i] = taosRand();
data2[i] = data1[i];
}
start_time = clock();
taosMergeSort(data1, arraySize, sizeof(int), cmpInt);
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosMSort length:%d cost: %f s\n", arraySize, cpu_time_used);
start_time = clock();
taosSort(data2, arraySize, sizeof(int), cmpInt);
end_time = clock();
cpu_time_used = ((double)(end_time - start_time)) / CLOCKS_PER_SEC;
printf("taosSort length:%d cost: %f s\n", arraySize, cpu_time_used);
for (int i = 0; i < arraySize - 1; i++) {
EXPECT_EQ(data1[i], data2[i]);
ASSERT_LE(data1[i], data1[i+1]);
}
}

View File

@ -292,11 +292,11 @@ if $rows != 1 then
return -1 return -1
endi endi
sql drop index $data[0][0] #sql drop index $data[0][0]
if $rows != 0 then #if $rows != 0 then
return -1 #return -1
endi #endi
sql_error drop index t2 sql_error drop index t2
@ -304,7 +304,7 @@ sql_error drop index t3
sql create index ti0 on $mtPrefix (t1) #sql create index ti0 on $mtPrefix (t1)
$i = $interval $i = $interval
while $i < $limit while $i < $limit

View File

@ -336,7 +336,7 @@ class TDTestCase:
for i in range(expectRows): for i in range(expectRows):
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: if totalConsumeRows > expectrowcnt or totalConsumeRows < 0:
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")

View File

@ -218,7 +218,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0] actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -216,7 +216,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0] actConsumeTotalRows = resultList[0]
tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted)) tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted))
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -218,7 +218,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10) time.sleep(10)

View File

@ -216,7 +216,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0] actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -217,7 +217,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)): if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)):
tdLog.exit("%d tmq consume rows error!"%consumerId) tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10) time.sleep(10)