fix:conflicts from 3.0

This commit is contained in:
wangmm0220 2024-01-26 16:32:40 +08:00
commit 35e2b681c7
68 changed files with 1480 additions and 1182 deletions

View File

@ -101,7 +101,7 @@ Query OK, 2 row(s) in set (0.004076s)
## Query Examples
If you want query the data of "tags": {"location": "California.LosAngeles", "groupid": 1}, here is the query SQL:
If you want query the data of "tags": {"location": "California.LosAngeles", "groupid": 1}, here is the query SQL:
```sql
SELECT * FROM `meters_current` WHERE location = "California.LosAngeles" AND groupid = 3;

View File

@ -22,7 +22,7 @@ import CAsync from "./_c_async.mdx";
SQL is used by TDengine as its query language. Application programs can send SQL statements to TDengine through REST API or client libraries. TDengine's CLI `taos` can also be used to execute ad hoc SQL queries. Here is the list of major query functionalities supported by TDengine:
- Query on single column or multiple columns
- Filter on tags or data columns: >, <, =, <\>, like
- Filter on tags or data columns: &gt;, &lt;, =, &lt;&gt;, like
- Grouping of results: `Group By` - Sorting of results: `Order By` - Limit the number of results: `Limit/Offset`
- Windowed aggregate queries for time windows (interval), session windows (session), and state windows (state_window)
- Arithmetic on columns of numeric types or aggregate results
@ -159,7 +159,7 @@ In the section describing [Insert](../insert-data/sql-writing), a database named
:::note
1. With either REST connection or native connection, the above sample code works well.
2. Please note that `use db` can't be used in case of REST connection because it's stateless. You can specify the database name by either the REST endpoint's parameter or <db_name>.<table_name> in the SQL command.
2. Please note that `use db` can't be used in case of REST connection because it's stateless. You can specify the database name by either the REST endpoint's parameter or &lt;db_name&gt;.&lt;table_name&gt; in the SQL command.
:::

View File

@ -104,7 +104,7 @@ Replace `aggfn` with the name of your function.
### UDF Interface Definition in C
There are strict naming conventions for interface functions. The names of the start, finish, init, and destroy interfaces must be <udf-name\>_start, <udf-name\>_finish, <udf-name\>_init, and <udf-name\>_destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
There are strict naming conventions for interface functions. The names of the start, finish, init, and destroy interfaces must be &lt;udf-name&gt;_start, &lt;udf-name&gt;_finish, &lt;udf-name&gt;_init, and &lt;udf-name&gt;_destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function.
Interface functions return a value that indicates whether the operation was successful. If an operation fails, the interface function returns an error code. Otherwise, it returns TSDB_CODE_SUCCESS. The error codes are defined in `taoserror.h` and in the common API error codes in `taos.h`. For example, TSDB_CODE_UDF_INVALID_INPUT indicates invalid input. TSDB_CODE_OUT_OF_MEMORY indicates insufficient memory.
@ -194,7 +194,7 @@ typedef struct SUdfInterBuf {
```
The data structure is described as follows:
- The SUdfDataBlock block includes the number of rows (numOfRows) and the number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn.
- The SUdfDataBlock block includes the number of rows (numOfRows) and the number of columns (numCols). udfCols[i] (0 &lt;= i &lt;= numCols-1) indicates that each column is of type SUdfColumn.
- SUdfColumn includes the definition of the data type of the column (colMeta) and the data in the column (colData).
- The member definitions of SUdfColumnMeta are the same as the data type definitions in `taos.h`.
- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data.

View File

@ -186,7 +186,7 @@ The base API is used to do things like create database connections and provide a
- The variables database and len are applied by the user outside and allocated space. The current database name and length will be assigned to database and len.
- As long as the db name is not assigned to the database normally (including truncation), an error will be returned with the return value of -1, and then the user can use taos_errstr(NULL) to get error message.
- If database==NULL or len<=0, returns an error, the space required to store the db (including the last '\0') in the variable required
- If database==NULL or len&lt;=0, returns an error, the space required to store the db (including the last '\0') in the variable required
- If len is less than the space required to store the db (including the last '\0'), an error is returned. The truncated data assigned in the database ends with '\0'.
- If len is greater than or equal to the space required to store the db (including the last '\0'), return normal 0, and assign the db name ending with '\0' in the database.

View File

@ -69,7 +69,7 @@ TDengine currently supports timestamp, number, character, Boolean type, and the
| SMALLINT | i16 |
| TINYINT | i8 |
| BOOL | bool |
| BINARY | Vec<u8\> |
| BINARY | Vec&lt;u8&gt; |
| NCHAR | String |
| JSON | serde_json::Value |

View File

@ -315,7 +315,7 @@ The `connect()` function returns a `taos.TaosConnection` instance. In client-sid
All arguments to the `connect()` function are optional keyword arguments. The following are the connection parameters specified.
- `url`: The URL of taosAdapter REST service. The default is <http://localhost:6041>.
- `url`: The URL of taosAdapter REST service. The default is `http://localhost:6041`.
- `user`: TDengine user name. The default is `root`.
- `password`: TDengine user password. The default is `taosdata`.
- `timeout`: HTTP request timeout. Enter a value in seconds. The default is `socket._GLOBAL_DEFAULT_TIMEOUT`. Usually, no configuration is needed.

View File

@ -8,7 +8,7 @@ description: This document describes the TDengine PHP client library.
PHP client library relies on TDengine client driver.
Project Repository: <https://github.com/Yurunsoft/php-tdengine>
Project Repository: [https://github.com/Yurunsoft/php-tdengine](https://github.com/Yurunsoft/php-tdengine)
After TDengine client or server is installed, `taos.h` is located at:

View File

@ -68,14 +68,14 @@ TDengine supports a variety of constants:
| # | **Syntax** | **Type** | **Description** |
| --- | :-----------------------------------------------: | --------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 1 | [{+ \| -}]123 | BIGINT | Integer literals are of type BIGINT. Data that exceeds the length of the BIGINT type is truncated. |
| 1 | [+ \| -]123 | BIGINT | Integer literals are of type BIGINT. Data that exceeds the length of the BIGINT type is truncated. |
| 2 | 123.45 | DOUBLE | Floating-point literals are of type DOUBLE. Numeric values will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used. |
| 3 | 1.2E3 | DOUBLE | Literals in scientific notation are of type DOUBLE. |
| 4 | 'abc' | BINARY | Content enclosed in single quotation marks is of type BINARY. The size of a BINARY is the size of the string in bytes. A literal single quote inside the string must be escaped with a backslash `\'`. |
| 5 | 'abc' | BINARY | Content enclosed in double quotation marks is of type BINARY. The size of a BINARY is the size of the string in bytes. A literal double quote inside the string must be escaped with a backslash `\"`. |
| 6 | TIMESTAMP {'literal' \| "literal"} | TIMESTAMP | The TIMESTAMP keyword indicates that the following string literal is interpreted as a timestamp. The string must be in YYYY-MM-DD HH:mm:ss.MS format. The precision is inherited from the database configuration. |
| 7 | {TRUE \| FALSE} | BOOL | Boolean literals are of type BOOL. |
| 8 | {'' \| "" \| '\t' \| "\t" \| ' ' \| " " \| NULL } | -- | The preceding characters indicate null literals. These can be used with any data type. |
| 6 | TIMESTAMP ['literal' \| "literal"] | TIMESTAMP | The TIMESTAMP keyword indicates that the following string literal is interpreted as a timestamp. The string must be in YYYY-MM-DD HH:mm:ss.MS format. The precision is inherited from the database configuration. |
| 7 | [TRUE \| FALSE] | BOOL | Boolean literals are of type BOOL. |
| 8 | ['' \| "" \| '\t' \| "\t" \| ' ' \| " " \| NULL ] | -- | The preceding characters indicate null literals. These can be used with any data type. |
:::note
Numeric values will be determined as integer or float type according to whether there is decimal point or whether scientific notation is used, so attention must be paid to avoid overflow. For example, 9999999999999999999 will be considered as overflow because it exceeds the upper limit of long integer, but 9999999999999999999.0 will be considered as a legal float number.

View File

@ -56,7 +56,7 @@ database_option: {
- WAL_FSYNC_PERIOD: specifies the interval (in milliseconds) at which data is written from the WAL to disk. This parameter takes effect only when the WAL parameter is set to 2. The default value is 3000. Enter a value between 0 and 180000. The value 0 indicates that incoming data is immediately written to disk.
- MAXROWS: specifies the maximum number of rows recorded in a block. The default value is 4096.
- MINROWS: specifies the minimum number of rows recorded in a block. The default value is 100.
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to three times of the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. TDengine Enterprise supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 <= keep 1 <= keep 2, e.g. KEEP 100h,100d,3650d) are supported; TDengine OSS does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- KEEP: specifies the time for which data is retained. Enter a value between 1 and 365000. The default value is 3650. The value of the KEEP parameter must be greater than or equal to three times of the value of the DURATION parameter. TDengine automatically deletes data that is older than the value of the KEEP parameter. You can use m (minutes), h (hours), and d (days) as the unit, for example KEEP 100h or KEEP 10d. If you do not include a unit, d is used by default. TDengine Enterprise supports [Tiered Storage](https://docs.tdengine.com/tdinternal/arch/#tiered-storage) function, thus multiple KEEP values (comma separated and up to 3 values supported, and meet keep 0 &lt;= keep 1 &lt;= keep 2, e.g. KEEP 100h,100d,3650d) are supported; TDengine OSS does not support Tiered Storage function (although multiple keep values are configured, they do not take effect, only the maximum keep value is used as KEEP).
- PAGES: specifies the number of pages in the metadata storage engine cache on each vnode. Enter a value greater than or equal to 64. The default value is 256. The space occupied by metadata storage on each vnode is equal to the product of the values of the PAGESIZE and PAGES parameters. The space occupied by default is 1 MB.
- PAGESIZE: specifies the size (in KB) of each page in the metadata storage engine cache on each vnode. The default value is 4. Enter a value between 1 and 16384.
- PRECISION: specifies the precision at which a database records timestamps. Enter ms for milliseconds, us for microseconds, or ns for nanoseconds. The default value is ms.

View File

@ -877,11 +877,11 @@ HISTOGRAM(expr, bin_type, bin_description, normalized)
- "user_input": "[1, 3, 5, 7]":
User specified bin values.
- "linear_bin": "{"start": 0.0, "width": 5.0, "count": 5, "infinity": true}"
- "linear_bin": "&lcub;"start": 0.0, "width": 5.0, "count": 5, "infinity": true&rcub;"
"start" - bin starting point. "width" - bin offset. "count" - number of bins generated. "infinity" - whether to add (-inf, inf) as start/end point in generated set of bins.
The above "linear_bin" descriptor generates a set of bins: [-inf, 0.0, 5.0, 10.0, 15.0, 20.0, +inf].
- "log_bin": "{"start":1.0, "factor": 2.0, "count": 5, "infinity": true}"
- "log_bin": "&lcub;"start":1.0, "factor": 2.0, "count": 5, "infinity": true&rcub;"
"start" - bin starting point. "factor" - exponential factor of bin offset. "count" - number of bins generated. "infinity" - whether to add (-inf, inf) as start/end point in generated range of bins.
The above "linear_bin" descriptor generates a set of bins: [-inf, 1.0, 2.0, 4.0, 8.0, 16.0, +inf].
- normalized: setting to 1/0 to turn on/off result normalization. Valid values are 0 or 1.
@ -977,7 +977,7 @@ ignore_null_values: {
- `INTERP` is used to get the value that matches the specified time slice from a column. If no such value exists an interpolation value will be returned based on `FILL` parameter.
- The input data of `INTERP` is the value of the specified column and a `where` clause can be used to filter the original data. If no `where` condition is specified then all original data is the input.
- `INTERP` must be used along with `RANGE`, `EVERY`, `FILL` keywords.
- The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 <= timestamp2. timestamp1 is the starting point of the output time range. timestamp2 is the ending point of the output time range.
- The output time range of `INTERP` is specified by `RANGE(timestamp1,timestamp2)` parameter, with timestamp1 &lt;= timestamp2. timestamp1 is the starting point of the output time range. timestamp2 is the ending point of the output time range.
- The number of rows in the result set of `INTERP` is determined by the parameter `EVERY(time_unit)`. Starting from timestamp1, one interpolation is performed for every time interval specified `time_unit` parameter. The parameter `time_unit` must be an integer, with no quotes, with a time unit of: a(millisecond)), s(second), m(minute), h(hour), d(day), or w(week). For example, `EVERY(500a)` will interpolate every 500 milliseconds.
- Interpolation is performed based on `FILL` parameter. For more information about FILL clause, see [FILL Clause](../distinguished/#fill-clause).
- When only one timestamp value is specified in `RANGE` clause, `INTERP` is used to generate interpolation at this point in time. In this case, `EVERY` clause can be omitted. For example, SELECT INTERP(col) FROM tb RANGE('2023-01-01 00:00:00') FILL(linear).

View File

@ -35,9 +35,9 @@ TDengine supports the `UNION` and `UNION ALL` operations. UNION ALL collects all
| # | **Operator** | **Supported Data Types** | **Description** |
| --- | :---------------: | -------------------------------------------------------------------- | -------------------- |
| 1 | = | All types except BLOB, MEDIUMBLOB, and JSON | Equal to |
| 2 | <\>, != | All types except BLOB, MEDIUMBLOB, and JSON; the primary key (timestamp) is also not supported | Not equal to |
| 3 | \>, < | All types except BLOB, MEDIUMBLOB, and JSON | Greater than and less than |
| 4 | \>=, <= | All types except BLOB, MEDIUMBLOB, and JSON | Greater than or equal to and less than or equal to |
| 2 | &lt;&gt;, != | All types except BLOB, MEDIUMBLOB, and JSON; the primary key (timestamp) is also not supported | Not equal to |
| 3 | &gt;, &lt; | All types except BLOB, MEDIUMBLOB, and JSON | Greater than and less than |
| 4 | &gt;=, &lt;= | All types except BLOB, MEDIUMBLOB, and JSON | Greater than or equal to and less than or equal to |
| 5 | IS [NOT] NULL | All types | Indicates whether the value is null |
| 6 | [NOT] BETWEEN AND | All types except BLOB, MEDIUMBLOB, JSON and GEOMETRY | Closed interval comparison |
| 7 | IN | All types except BLOB, MEDIUMBLOB, and JSON; the primary key (timestamp) is also not supported | Equal to any value in the list |

View File

@ -71,7 +71,7 @@ The following data types can be used in the schema for standard tables.
| 44 | SHOW STREAMS | Modified | This statement previously showed continuous queries. The continuous query feature has been replaced with the stream processing feature. This statement now shows streams that have been created.
| 45 | SHOW SUBSCRIPTIONS | Added | Shows all subscriptions in the current database.
| 46 | SHOW TABLES | Modified | Only shows table names.
| 47 | SHOW TABLE DISTRIBUTED | Added | Shows how table data is distributed. This replaces the `SELECT _block_dist() FROM { tb_name | stb_name }` command.
| 47 | SHOW TABLE DISTRIBUTED | Added | Shows how table data is distributed. This replaces the `SELECT _block_dist() FROM &lcub; tb_name | stb_name &rcub;` command.
| 48 | SHOW TOPICS | Added | Shows all subscribed topics in the current database.
| 49 | SHOW TRANSACTIONS | Added | Shows all running transactions in the system.
| 50 | SHOW DNODE VARIABLES | Added | Shows the configuration of the specified dnode.

View File

@ -15,7 +15,7 @@ Diagnostic steps:
2. On the server side, execute command `taos -n server -P <port> -l <pktlen>` to monitor the port range starting from the port specified by `-P` parameter with the role of "server".
3. On the client side, execute command `taos -n client -h <fqdn of server> -P <port> -l <pktlen>` to send a testing package to the specified server and port.
-l <pktlen\>: The size of the testing package, in bytes. The value range is [11, 64,000] and default value is 1,000.
-l &lt;pktlen&gt;: The size of the testing package, in bytes. The value range is [11, 64,000] and default value is 1,000.
Please note that the package length must be same in the above 2 commands executed on server side and client side respectively.
Output of the server side for the example is below:
@ -63,7 +63,7 @@ Once this parameter is set to 135 or 143, the log file grows very quickly especi
## Client Log
An independent log file, named as "taoslog+<seq num\>" is generated for each client program, i.e. a client process. The parameter `debugFlag` is used to control the log level. The default value is 131. For debugging and tracing, it needs to be set to either 135 or 143 respectively.
An independent log file, named as "taoslog+&lt;seq num&gt;" is generated for each client program, i.e. a client process. The parameter `debugFlag` is used to control the log level. The default value is 131. For debugging and tracing, it needs to be set to either 135 or 143 respectively.
The default value of `debugFlag` is also 131 and only logs at level of INFO/ERROR/WARNING are recorded. As stated above, for debugging and tracing, it needs to be changed to 135 or 143 respectively, so that logs at DEBUG or TRACE level can be recorded.

View File

@ -81,7 +81,7 @@ Parameter Description:
:::note
URL Encoding. Make sure that parameters are properly encoded. For example, when specifying a timezone you must properly encode special characters. ?tz=Etc/GMT+10 will not work because the <+> plus symbol is recognized as a space in the url. It's best practice to encode all special characters in a parameter. Instead use ?tz=Etc%2FGMT%2B10 for the parameter.
URL Encoding. Make sure that parameters are properly encoded. For example, when specifying a timezone you must properly encode special characters. ?tz=Etc/GMT+10 will not work because the + plus symbol is recognized as a space in the url. It's best practice to encode all special characters in a parameter. Instead use ?tz=Etc%2FGMT%2B10 for the parameter.
:::

View File

@ -166,8 +166,8 @@ See [example/config/taosadapter.toml](https://github.com/taosdata/taosadapter/bl
- Compatible with InfluxDB v1 write interface
[https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/](https://docs.influxdata.com/influxdb/v2.0/reference/api/influxdb-1x/write/)
- Compatible with OpenTSDB JSON and telnet format writes
- <http://opentsdb.net/docs/build/html/api_http/put.html>
- <http://opentsdb.net/docs/build/html/api_telnet/put.html>
- [http://opentsdb.net/docs/build/html/api_http/put.html](http://opentsdb.net/docs/build/html/api_http/put.html)
- [http://opentsdb.net/docs/build/html/api_telnet/put.html](http://opentsdb.net/docs/build/html/api_telnet/put.html)
- Seamless connection to collectd
collectd is a system statistics collection daemon, please visit [https://collectd.org/](https://collectd.org/) for more information.
- Seamless connection with StatsD

View File

@ -94,67 +94,67 @@ taosBenchmark -f <json file>
## Command-line argument in detail
- **-f/--file <json file\>** :
- **-f/--file &lt;json file&gt;** :
specify the configuration file to use. This file includes All parameters. Users should not use this parameter with other parameters on the command-line. There is no default value.
- **-c/--config-dir <dir\>** :
- **-c/--config-dir &lt;dir&gt;** :
specify the directory where the TDengine cluster configuration file. The default path is `/etc/taos`.
- **-h/--host <host\>** :
- **-h/--host &lt;host&gt;** :
Specify the FQDN of the TDengine server to connect to. The default value is localhost.
- **-P/--port <port\>** :
- **-P/--port &lt;port&gt;** :
The port number of the TDengine server to connect to, the default value is 6030.
- **-I/--interface <insertMode\>** :
- **-I/--interface &lt;insertMode&gt;** :
Insert mode. Options are taosc, rest, stmt, sml, sml-rest, corresponding to normal write, restful interface writing, parameter binding interface writing, schemaless interface writing, RESTful schemaless interface writing (provided by taosAdapter). The default value is taosc.
- **-u/--user <user\>** :
- **-u/--user &lt;user&gt;** :
User name to connect to the TDengine server. Default is root.
- **-U/--supplement-insert ** :
Supplementally insert data without create database and table, optional, default is off.
- **-p/--password <passwd\>** :
- **-p/--password &lt;passwd&gt;** :
The default password to connect to the TDengine server is `taosdata`.
- **-o/--output <file\>** :
- **-o/--output &lt;file&gt;** :
specify the path of the result output file, the default value is `. /output.txt`.
- **-T/--thread <threadNum\>** :
- **-T/--thread &lt;threadNum&gt;** :
The number of threads to insert data. Default is 8.
- **-B/--interlace-rows <rowNum\>** :
- **-B/--interlace-rows &lt;rowNum&gt;** :
Enables interleaved insertion mode and specifies the number of rows of data to be inserted into each child table. Interleaved insertion mode means inserting the number of rows specified by this parameter into each sub-table and repeating the process until all sub-tables have been inserted. The default value is 0, i.e., data is inserted into one sub-table before the next sub-table is inserted.
- **-i/--insert-interval <timeInterval\>** :
- **-i/--insert-interval &lt;timeInterval&gt;** :
Specify the insert interval in `ms` for interleaved insert mode. The default value is 0. It only works if `-B/--interlace-rows` is greater than 0. After inserting interlaced rows for each child table, the data insertion thread will wait for the interval specified by this value before proceeding to the next round of writes.
- **-r/--rec-per-req <rowNum\>** :
- **-r/--rec-per-req &lt;rowNum&gt;** :
Writing the number of rows of records per request to TDengine, the default value is 30000.
- **-t/--tables <tableNum\>** :
- **-t/--tables &lt;tableNum&gt;** :
Specify the number of sub-tables. The default is 10000.
- **-S/--timestampstep <stepLength\>** :
- **-S/--timestampstep &lt;stepLength&gt;** :
Timestamp step for inserting data in each child table in ms, default is 1.
- **-n/--records <recordNum\>** :
- **-n/--records &lt;recordNum&gt;** :
The default value of the number of records inserted in each sub-table is 10000.
- **-d/--database <dbName\>** :
- **-d/--database &lt;dbName&gt;** :
The name of the database used, the default value is `test`.
- **-b/--data-type <colType\>** :
- **-b/--data-type &lt;colType&gt;** :
specify the type of the data columns of the super table. It defaults to three columns of type FLOAT, INT, and FLOAT if not used.
- **-l/--columns <colNum\>** :
- **-l/--columns &lt;colNum&gt;** :
specify the number of columns in the super table. If both this parameter and `-b/--data-type` is set, the final result number of columns is the greater of the two. If the number specified by this parameter is greater than the number of columns specified by `-b/--data-type`, the unspecified column type defaults to INT, for example: `-l 5 -b float,double`, then the final column is `FLOAT,DOUBLE,INT,INT,INT`. If the number of columns specified is less than or equal to the number of columns specified by `-b/--data-type`, then the result is the column and type specified by `-b/--data-type`, e.g.: `-l 3 -b float,double,float,bigint`. The last column is `FLOAT,DOUBLE, FLOAT,BIGINT`.
- **-L/--partial-col-num <colNum\> ** :
- **-L/--partial-col-num &lt;colNum&gt; ** :
Specify first numbers of columns has data. Rest of columns' data are NULL. Default is all columns have data.
- **-A/--tag-type <tagType\>** :
- **-A/--tag-type &lt;tagType&gt;** :
The tag column type of the super table. nchar and binary types can both set the length, for example:
```
@ -168,10 +168,10 @@ Note: In some shells, such as bash, "()" needs to be escaped, so the above comma
taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
```
- **-w/--binwidth <length\>**:
- **-w/--binwidth &lt;length&gt;**:
specify the default length for nchar and binary types. The default value is 64.
- **-m/--table-prefix <tablePrefix\>** :
- **-m/--table-prefix &lt;tablePrefix&gt;** :
The prefix of the sub-table name, the default value is "d".
- **-E/--escape-character** :
@ -192,25 +192,25 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
- **-y/--answer-yes** :
Switch parameter that requires the user to confirm at the prompt to continue. The default value is false.
- **-O/--disorder <Percentage\>** :
- **-O/--disorder &lt;Percentage&gt;** :
Specify the percentage probability of disordered data, with a value range of [0,50]. The default is 0, i.e., there is no disordered data.
- **-R/--disorder-range <timeRange\>** :
- **-R/--disorder-range &lt;timeRange&gt;** :
Specify the timestamp range for the disordered data. It leads the resulting disorder timestamp as the ordered timestamp minus a random value in this range. Valid only if the percentage of disordered data specified by `-O/--disorder` is greater than 0.
- **-F/--prepared_rand <Num\>** :
- **-F/--prepared_rand &lt;Num&gt;** :
Specify the number of unique values in the generated random data. A value of 1 means that all data are equal. The default value is 10000.
- **-a/--replica <replicaNum\>** :
- **-a/--replica &lt;replicaNum&gt;** :
Specify the number of replicas when creating the database. The default value is 1.
- **-k/--keep-trying <NUMBER\>** :
- **-k/--keep-trying &lt;NUMBER&gt;** :
Keep trying if failed to insert, default is no. Available with v3.0.9+.
- **-z/--trying-interval <NUMBER\>** :
- **-z/--trying-interval &lt;NUMBER&;gt;** :
Specify interval between keep trying insert. Valid value is a positive number. Only valid when keep trying be enabled. Available with v3.0.9+.
- **-v/--vgroups <NUMBER\>** :
- **-v/--vgroups &lt;NUMBER&gt;** :
Specify vgroups number for creating a database, only valid with daemon version 3.0+
- **-V/--version** :
@ -226,7 +226,7 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\)
The parameters listed in this section apply to all function modes.
- **filetype** : The function to be tested, with optional values `insert`, `query` and `subscribe`. These correspond to the insert, query, and subscribe functions, respectively. Users can specify only one of these in each configuration file.
**cfgdir**: specify the TDengine client configuration file's directory. The default path is /etc/taos.
**cfgdir**: specify the TDengine client configuration file's directory. The default path is `/etc/taos`.
- **host**: Specify the FQDN of the TDengine server to connect. The default value is `localhost`.

View File

@ -289,7 +289,7 @@ A specific type "nchar" is provided in TDengine to store non-ASCII characters su
The characters input on the client side are encoded using the default system encoding, which is UTF-8 on Linux/macOS, or GB18030 or GBK on some systems in Chinese, POSIX in docker, CP936 on Windows in Chinese. The encoding of the operating system in use must be set correctly so that the characters in nchar type can be converted to UCS4-LE.
The locale definition standard on Linux/macOS is: <Language\>\_<Region\>.<charset\>, for example, in "zh_CN.UTF-8", "zh" means Chinese, "CN" means China mainland, "UTF-8" means charset. The charset indicates how to display the characters. On Linux/macOS, the charset can be set by locale in the system. On Windows system another configuration parameter `charset` must be used to configure charset because the locale used on Windows is not POSIX standard. Of course, `charset` can also be used on Linux/macOS to specify the charset.
The locale definition standard on Linux/macOS is: &lt;Language&gt;\_&lt;Region&gt;.&lt;charset&gt;, for example, in "zh_CN.UTF-8", "zh" means Chinese, "CN" means China mainland, "UTF-8" means charset. The charset indicates how to display the characters. On Linux/macOS, the charset can be set by locale in the system. On Windows system another configuration parameter `charset` must be used to configure charset because the locale used on Windows is not POSIX standard. Of course, `charset` can also be used on Linux/macOS to specify the charset.
:::

View File

@ -36,7 +36,7 @@ LoadPlugin network
</Plugin>
```
where <taosAdapter's host\> fills in the server's domain name or IP address running taosAdapter. <port for collectd direct\> fills in the port that taosAdapter uses to receive collectd data (default is 6045).
where &lt;taosAdapter's host&gt; fills in the server's domain name or IP address running taosAdapter. &lt;port for collectd direct&gt; fills in the port that taosAdapter uses to receive collectd data (default is 6045).
An example is as follows.
@ -62,7 +62,7 @@ LoadPlugin write_tsdb
</Plugin>
```
Where <taosAdapter's host\> is the domain name or IP address of the server running taosAdapter. <port for collectd write_tsdb plugin\> Fill in the data that taosAdapter uses to receive the collectd write_tsdb plugin (default is 6047).
Where &lt;taosAdapter's host&gt; is the domain name or IP address of the server running taosAdapter. &lt;port for collectd write_tsdb plugin&gt; Fill in the data that taosAdapter uses to receive the collectd write_tsdb plugin (default is 6047).
```text
LoadPlugin write_tsdb

View File

@ -26,7 +26,7 @@ The default database name written by the taosAdapter is `icinga2`. You can also
### Configure icinga3
- Enable opentsdb-writer for icinga2 (refer to the link https://icinga.com/docs/icinga-2/latest/doc/14-features/#opentsdb-writer)
- Modify the configuration file `/etc/icinga2/features-enabled/opentsdb.conf` by filling in <taosAdapter's host\> as the domain name or IP address of the server running taosAdapter and <port for icinga2\> as the corresponding port on which taosAdapter supports receiving icinga2 data (default is 6048)
- Modify the configuration file `/etc/icinga2/features-enabled/opentsdb.conf` by filling in &lt;taosAdapter's host&gt; as the domain name or IP address of the server running taosAdapter and &lt;port for icinga2&gt; as the corresponding port on which taosAdapter supports receiving icinga2 data (default is 6048)
```
object OpenTsdbWriter "opentsdb" {

View File

@ -9,8 +9,8 @@ Point the `remote_read url` and `remote_write url` to the domain name or IP addr
### Configure Basic authentication
- username: <TDengine's username>
- password: <TDengine's password>
- username: TDengine's username
- password: TDengine's password
### Example configuration of remote_write and remote_read related sections in prometheus.yml file

View File

@ -31,7 +31,7 @@ The default database name written by taosAdapter is `statsd`. To specify a diffe
### Configuring StatsD
To use StatsD, you need to download its [source code](https://github.com/statsd/statsd). Please refer to the example file `exampleConfig.js` in the root directory of the source download to modify the configuration file. In <taosAdapter's host\>, please fill in the domain name or IP address of the server running taosAdapter, and <port for StatsD\>, please fill in the port where taosAdapter receives StatsD data (default is 6044).
To use StatsD, you need to download its [source code](https://github.com/statsd/statsd). Please refer to the example file `exampleConfig.js` in the root directory of the source download to modify the configuration file. In &lt;taosAdapter's host&gt;, please fill in the domain name or IP address of the server running taosAdapter, and &lt;port for StatsD&gt;, please fill in the port where taosAdapter receives StatsD data (default is 6044).
```
backends section add ". /backends/repeater"

View File

@ -10,7 +10,7 @@ In the Telegraf configuration file (default location `/etc/telegraf/telegraf.con
...
```
Where <taosAdapter's host\> please fill in the server's domain name or IP address running the taosAdapter service. <REST service port\> please fill in the port of the REST service (default is 6041). <TDengine's username\> and <TDengine's password\> please fill in the actual configuration of the currently running TDengine. And <database name\> please fill in the database name where you want to store Telegraf data in TDengine.
Where &lt;taosAdapter's host&gt; please fill in the server's domain name or IP address running the taosAdapter service. &lt;REST service port&gt; please fill in the port of the REST service (default is 6041). &lt;TDengine's username&gt; and &lt;TDengine's password&gt; please fill in the actual configuration of the currently running TDengine. And &lt;database name&gt; please fill in the database name where you want to store Telegraf data in TDengine.
An example is as follows.

View File

@ -23,7 +23,7 @@ Record these values:
## Installing Grafana
TDengine currently supports Grafana versions 7.5 and above. Users can go to the Grafana official website to download the installation package and execute the installation according to the current operating system. The download address is as follows: <https://grafana.com/grafana/download>.
TDengine currently supports Grafana versions 7.5 and above. Users can go to the Grafana official website to download the installation package and execute the installation according to the current operating system. The download address is as follows: [https://grafana.com/grafana/download](https://grafana.com/grafana/download).
## Configuring Grafana
@ -59,7 +59,7 @@ bash -c "$(curl -fsSL \
-p taosdata
```
Restart Grafana service and open Grafana in web-browser, usually <http://localhost:3000>.
Restart Grafana service and open Grafana in web-browser, usually `http://localhost:3000`.
Save the script and type `./install.sh --help` for the full usage of the script.
@ -181,7 +181,7 @@ You can setup a zero-configuration stack for TDengine + Grafana by [docker-compo
3. Start TDengine and Grafana services: `docker-compose up -d`.
Open Grafana <http://localhost:3000>, and you can add dashboard with TDengine now.
Open Grafana (http://localhost:3000), and you can add dashboard with TDengine now.
</TabItem>
</Tabs>
@ -202,7 +202,7 @@ As shown above, select the `TDengine` data source in the `Query` and enter the c
:::note
Since the REST connection because is stateless. Grafana plugin can use <db_name>.<table_name> in the SQL command to specify the database name.
Since the REST connection because is stateless. Grafana plugin can use &lt;db_name&gt;.&lt;table_name&gt; in the SQL command to specify the database name.
:::

View File

@ -345,7 +345,7 @@ The following configuration items apply to TDengine Sink Connector and TDengine
### TDengine Sink Connector specific configuration
1. `connection.database`: The name of the target database. If the specified database does not exist, it will be created automatically. The time precision used for automatic library building is nanoseconds. The default value is null. When it is NULL, refer to the description of the `connection.database.prefix` parameter for the naming rules of the target database
2. `connection.database.prefix`: When `connection.database` is null, the prefix of the target database. Can contain placeholder '${topic}'. For example, kafka_${topic}, for topic 'orders' will be written to database 'kafka_orders'. Default null. When null, the name of the target database is the same as the name of the topic.
2. `connection.database.prefix`: When `connection.database` is null, the prefix of the target database. Can contain placeholder '$&lcub;topic&rcub;'. For example, kafka_$&lcub;topic&rcub;, for topic 'orders' will be written to database 'kafka_orders'. Default null. When null, the name of the target database is the same as the name of the topic.
3. `batch.size`: Write the number of records in each batch in batches. When the data received by the sink connector at one time is larger than this value, it will be written in some batches.
4. `max.retries`: The maximum number of retries when an error occurs. Defaults to 1.
5. `retry.backoff.ms`: The time interval for retry when sending an error. The unit is milliseconds. The default is 3000.
@ -370,12 +370,12 @@ The following configuration items apply to TDengine Sink Connector and TDengine
## Other notes
1. To use Kafka Connect, refer to <https://kafka.apache.org/documentation/#connect>.
1. To use Kafka Connect, refer to [https://kafka.apache.org/documentation/#connect](https://kafka.apache.org/documentation/#connect).
## Feedback
<https://github.com/taosdata/kafka-connect-tdengine/issues>
[https://github.com/taosdata/kafka-connect-tdengine/issues](https://github.com/taosdata/kafka-connect-tdengine/issues)
## Reference
1. For more information, see <https://kafka.apache.org/documentation/>
1. For more information, see [https://kafka.apache.org/documentation/](https://kafka.apache.org/documentation/).

View File

@ -217,6 +217,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CHKPT, "stream-req-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
@ -301,7 +302,6 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)

View File

@ -462,7 +462,6 @@ struct SStreamTask {
struct SStreamMeta* pMeta;
SSHashObj* pNameMap;
void* pBackend;
int64_t backendRefId;
char reserve[256];
};
@ -640,6 +639,7 @@ typedef struct {
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
int64_t streamId;
int64_t checkpointId;
@ -648,6 +648,7 @@ typedef struct {
SEpSet mgmtEps;
int32_t mnodeId;
int32_t transId;
int8_t mndTrigger;
int64_t expireTime;
} SStreamCheckpointSourceReq;
@ -770,6 +771,15 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SStreamTaskCheckpointReq;
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
@ -787,11 +797,12 @@ bool streamTaskShouldPause(const SStreamTask* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask);
const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask);
const char* streamTaskGetStatusStr(ETaskStatus status);
void streamTaskResetStatus(SStreamTask* pTask);
void streamTaskSetStatusReady(SStreamTask* pTask);
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
@ -806,7 +817,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask);
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn);
@ -839,6 +850,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
@ -866,6 +878,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -955,7 +955,6 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
taosArrayPush(pRequest->tableList, &pName);
pCreateReq->flags |= TD_CREATE_IF_NOT_EXISTS;
// change tag cid to new cid
@ -966,6 +965,12 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
// pCreateReq->ctb.suid = processSuid(pCreateReq->ctb.suid, pRequest->pDb);
toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName);
code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta);
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
code = TSDB_CODE_SUCCESS;
taosMemoryFreeClear(pTableMeta);
continue;
}
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
@ -983,6 +988,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
}
taosMemoryFreeClear(pTableMeta);
}
taosArrayPush(pRequest->tableList, &pName);
SVgroupCreateTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId));
if (pTableBatch == NULL) {
@ -999,6 +1005,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
}
}
if (taosHashGetSize(pVgroupHashmap) == 0) {
goto end;
}
SArray* pBufArray = serializeVgroupsCreateTableBatch(pVgroupHashmap);
if (NULL == pBufArray) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -223,6 +223,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -84,7 +84,6 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -835,7 +835,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -17,11 +17,15 @@
#define _TD_MND_STREAM_H_
#include "mndInt.h"
#include "mndTrans.h"
#ifdef __cplusplus
extern "C" {
#endif
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_VER_NUMBER 4
typedef struct SStreamTransInfo {
int64_t startTime;
int64_t streamUid;
@ -50,8 +54,22 @@ typedef struct SStreamExecInfo {
SHashObj *pTaskMap;
SArray *pTaskList;
TdThreadMutex lock;
SHashObj *pTransferStateStreams;
} SStreamExecInfo;
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
int32_t transId;
} SFailedCheckpointInfo;
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
@ -67,7 +85,7 @@ void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
@ -79,7 +97,22 @@ int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
#ifdef __cplusplus
}

View File

@ -17,6 +17,8 @@
#include "mndDef.h"
#include "mndConsumer.h"
static void *freeStreamTasks(SArray *pTaskLevel);
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
@ -121,11 +123,18 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
pObj->tasks = NULL;
if (pObj->tasks != NULL) {
pObj->tasks = freeStreamTasks(pObj->tasks);
}
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (tDecodeI32(pDecoder, &sz) < 0) {
return -1;
}
if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
@ -165,8 +174,9 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0;
}
static void *freeStreamTasks(SArray *pTaskLevel) {
void *freeStreamTasks(SArray *pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i);
int32_t taskSz = taosArrayGetSize(pLevel);

View File

@ -545,6 +545,7 @@ void dumpHeader(SSdb *pSdb, SJson *json) {
SJson *maxIdsJson = tjsonCreateObject();
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
for (int32_t i = 0; i < SDB_MAX; ++i) {
if(i == 5) continue;
int64_t maxId = 0;
if (i < SDB_MAX) {
maxId = pSdb->maxId[i];

View File

@ -767,7 +767,7 @@ _OVER:
pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE ||
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER) {
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state));
return -1;

View File

@ -639,7 +639,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndPersistStream(pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreate->name,
@ -872,7 +872,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
}
// drop stream
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
mError("stream:%s, failed to drop log since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
@ -923,7 +923,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
goto _OVER;
}
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_DROPPED) < 0) {
mndReleaseStream(pMnode, pStream);
goto _OVER;
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,336 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndStream.h"
#include "mndTrans.h"
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
saveStreamTasksInfo(pStream, &execInfo);
sdbRelease(pSdb, pStream);
}
}
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64,
pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId);
pNodeEntry->stageUpdated = true;
pTaskEntry->stage = stage;
break;
}
}
}
static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) {
int32_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SFailedCheckpointInfo* p = taosArrayGet(pList, i);
if (p->transId == pInfo->transId) {
return;
}
}
taosArrayPush(pList, pInfo);
}
static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint");
if (pTrans == NULL) {
return terrno;
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
taosWUnLockLatch(&pStream->lock);
return terrno;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
continue;
}
if (!hasEpset) {
taosMemoryFree(pReq);
continue;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
return terrno;
}
}
}
taosWUnLockLatch(&pStream->lock);
int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
mndKillTransImpl(pMnode, transId, "");
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
if (pStream == NULL) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
} else {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) {
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetSTbName);
} else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream);
}
}
mndReleaseStream(pMnode, pStream);
return code;
}
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
for (int k = 0; k < num; ++k) {
int32_t *pVgId = taosArrayGet(pNodeList, k);
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
break;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void* pIter = NULL;
while(1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if(pStream->status != STREAM_STATUS__PAUSE){
SMPauseStreamReq reqPause = {0};
strcpy(reqPause.name, pStream->name);
reqPause.igNotExists = 1;
int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause);
void * pHead = rpcMallocCont(contLen);
tSerializeSMPauseStreamReq(pHead, contLen, &reqPause);
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_PAUSE_STREAM,
.pCont = pHead,
.contLen = contLen,
.info = *info,
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mInfo("receivee pause stream:%s, %s, %p, because grant expired", pStream->name, reqPause.name, reqPause.name);
}
sdbRelease(pSdb, pStream);
}
return 0;
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
if(grantCheck(TSDB_GRANT_STREAMS) < 0){
if(suspendAllStreams(pMnode, &pReq->info) < 0){
return -1;
}
}
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
streamMetaClearHbMsg(&req);
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
tDecoderClear(&decoder);
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
if (numOfExisted == 0) {
doExtractTasksFromStream(pMnode);
}
initStreamNodeList(pMnode);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
setNodeEpsetExpiredFlag(req.pUpdateNodes);
}
bool snodeChanged = false;
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
continue;
}
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if (pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
if (!pTaskEntry->inputQChanging) {
pTaskEntry->inputQUnchangeCounter++;
} else {
pTaskEntry->inputQChanging = false;
}
} else {
pTaskEntry->inputQChanging = true;
pTaskEntry->inputQUnchangeCounter = 0;
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->checkpointId != 0) {
if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
addIntoCheckpointList(pList, &info);
}
}
}
if (p->status == pTaskEntry->status) {
pTaskEntry->statusLastDuration++;
} else {
pTaskEntry->status = p->status;
pTaskEntry->statusLastDuration = 0;
}
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
}
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pList) > 0) {
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId);
}
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
}
}
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
taosArrayDestroy(pList);
return TSDB_CODE_SUCCESS;
}

View File

@ -160,3 +160,106 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj* pStream, int64_t checkpointId)
return TSDB_CODE_SUCCESS;
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
mndTransDrop(pTrans);
return NULL;
}
terrno = 0;
return pTrans;
}
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
int32_t size = sizeof(int32_t) + tlen + MND_STREAM_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_STREAM, MND_STREAM_VER_NUMBER, size);
if (pRaw == NULL) goto STREAM_ENCODE_OVER;
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto STREAM_ENCODE_OVER;
tEncoderInit(&encoder, buf, tlen);
if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tEncoderClear(&encoder);
goto STREAM_ENCODE_OVER;
}
tEncoderClear(&encoder);
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, tlen, STREAM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, STREAM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, STREAM_ENCODE_OVER);
terrno = TSDB_CODE_SUCCESS;
STREAM_ENCODE_OVER:
taosMemoryFreeClear(buf);
if (terrno != TSDB_CODE_SUCCESS) {
mError("stream:%s, failed to encode to raw:%p since %s", pStream->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("stream:%s, encode to raw:%p, row:%p, checkpoint:%" PRId64 "", pStream->name, pRaw, pStream,
pStream->checkpointId);
return pRaw;
}
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) {
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL) {
mError("failed to encode stream since %s", terrstr());
mndTransDrop(pTrans);
return -1;
}
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
sdbFreeRaw(pCommitRaw);
mndTransDrop(pTrans);
return -1;
}
if (sdbSetRawStatus(pCommitRaw, status) != 0) {
mError("stream trans:%d failed to set raw status:%d since %s", pTrans->id, status, terrstr());
sdbFreeRaw(pCommitRaw);
mndTransDrop(pTrans);
return -1;
}
return 0;
}
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode) {
pAction->epSet = *pEpset;
pAction->contLen = contLen;
pAction->pCont = pCont;
pAction->msgType = msgType;
pAction->retryCode = retryCode;
}

View File

@ -0,0 +1,281 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndStream.h"
#include "mndTrans.h"
#include "tmisce.h"
#include "mndVgroup.h"
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
// if not all ready till now, no need to check the remaining vgroups.
if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) {
if (!pVgroup->vnodeGid[i].syncRestore) {
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false;
break;
}
ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false;
break;
}
}
}
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pVgroup);
}
SSnodeObj *pObj = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {0};
addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
entry.nodeId = SNODE_HANDLE;
char buf[256] = {0};
EPSET_TO_STR(&entry.epset, buf);
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
taosArrayPush(pVgroupListSnapshot, &entry);
sdbRelease(pSdb, pObj);
}
return pVgroupListSnapshot;
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->uid == streamId) {
sdbCancelFetch(pSdb, pIter);
return pStream;
}
sdbRelease(pSdb, pStream);
}
return NULL;
}
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
}
}
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId) {
*hasEpset = false;
pEpSet->numOfEps = 0;
if (nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter != NULL) {
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mError("failed to acquire snode epset");
return TSDB_CODE_INVALID_PARA;
}
} else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
if (pVgObj != NULL) {
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
epsetAssign(pEpSet, &epset);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
return TSDB_CODE_SUCCESS;
}
}
}
static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfLevels = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfLevels; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (pTask->id.taskId == pId->taskId) {
return pTask;
}
}
}
return NULL;
}
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
int32_t num = 0;
for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
SArray* pLevel = taosArrayGetP(pStream->tasks, i);
num += taosArrayGetSize(pLevel);
}
return num;
}
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup);
}
}
}
return 0;
}
static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
taosMemoryFree(pReq);
return TSDB_CODE_SUCCESS;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
}
return 0;
}

View File

@ -235,7 +235,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);

View File

@ -886,7 +886,8 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
pTask->execInfo.step2Start = taosGetTimestampMs();
if (done) {
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer,
pRange->maxVer, 0.0);
streamTaskPutTranstateIntoInputQ(pTask);
streamExecTask(pTask); // exec directly
} else {
@ -1141,8 +1142,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
req.taskId);
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
@ -1169,18 +1169,22 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId);
if (req.mndTrigger == 1) {
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure",
pTask->id.idStr, req.checkpointId);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
} else {
ASSERT(status == TASK_STATUS__HALT);
}
// check if the checkpoint msg already sent or not.
@ -1198,16 +1202,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
streamProcessCheckpointSourceReq(pTask, &req);
taosThreadMutexUnlock(&pTask->lock);
int32_t total = 0;
streamMetaWLock(pMeta);
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
total = pMeta->numOfStreamTasks;
streamMetaWUnLock(pMeta);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) {
@ -1233,35 +1229,3 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
}
// NOTE: here we may receive this message more than once, so need to handle this case
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr);
if (pTask->hTaskInfo.id.taskId == 0) {
tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
taosThreadMutexLock(&pTask->lock);
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
taosThreadMutexUnlock(&pTask->lock);
// clear the scheduler status
streamTaskSetSchedStatusInactive(pTask);
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -465,7 +465,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver,
pReader->nextBlk, numOfBlocks, idstr);
(pReader->nextBlk + 1), numOfBlocks, idstr);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {

View File

@ -663,7 +663,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHTaskId = &pTask->hTaskInfo.id;
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId);
tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId,
(int32_t)pHTaskId->taskId);
}
streamMetaReleaseTask(pMeta, pTask);
}

View File

@ -600,11 +600,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
tqProcessTaskResetReq(pVnode->pTq, pMsg);
}
} break;
case TDMT_STREAM_HTASK_DROP: {
if (pVnode->restored && vnodeIsLeader(pVnode)) {
tqProcessTaskDropHTask(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -2142,7 +2142,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTSInfo->base.cond.startVersion = pStreamInfo->fillHistoryVer.minVer;
pTSInfo->base.cond.endVersion = pStreamInfo->fillHistoryVer.maxVer;
pTSInfo->base.cond.twindows = pStreamInfo->fillHistoryWindow;
qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
qDebug("stream scan step2 (scan wal), verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 "-%" PRId64 ", %s",
pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey,
pTSInfo->base.cond.twindows.ekey, id);
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;

View File

@ -1305,10 +1305,13 @@ static bool validateStateOper(const SValueNode* pVal) {
if (TSDB_DATA_TYPE_BINARY != pVal->node.resType.type) {
return false;
}
return (
0 == strncasecmp(varDataVal(pVal->datum.p), "GT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "GE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "LT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "LE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "EQ", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "NE", 2));
if (strlen(varDataVal(pVal->datum.p)) == 2) {
return (
0 == strncasecmp(varDataVal(pVal->datum.p), "GT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "GE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "LT", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "LE", 2) ||
0 == strncasecmp(varDataVal(pVal->datum.p), "EQ", 2) || 0 == strncasecmp(varDataVal(pVal->datum.p), "NE", 2));
}
return false;
}
static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
@ -3737,7 +3740,11 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateTbUidColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
#ifdef BUILD_NO_CALL
.sprocessFunc = qTbUidFunction,
#else
.sprocessFunc = NULL,
#endif
.finalizeFunc = NULL
},
{
@ -3747,7 +3754,11 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateVgIdColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
#ifdef BUILD_NO_CALL
.sprocessFunc = qVgIdFunction,
#else
.sprocessFunc = NULL,
#endif
.finalizeFunc = NULL
},
{

View File

@ -1788,6 +1788,7 @@ bool getTimePseudoFuncEnv(SFunctionNode *UNUSED_PARAM(pFunc), SFuncExecEnv *pEnv
return true;
}
#ifdef BUILD_NO_CALL
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
colDataSetInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0));
return TSDB_CODE_SUCCESS;
@ -1797,6 +1798,7 @@ int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
colDataSetInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 1));
return TSDB_CODE_SUCCESS;
}
#endif
int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
colDataSetInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 2));
@ -1824,7 +1826,7 @@ int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
#ifdef BUILD_NO_CALL
int32_t qTbUidFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
char* p = colDataGetNumData(pInput->columnData, 0);
@ -1848,7 +1850,7 @@ int32_t qVgIdFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}
#endif
/** Aggregation functions **/
int32_t countScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {

View File

@ -56,13 +56,6 @@ struct SStreamTaskSM {
SArray* pWaitingEventList;
};
typedef struct SStreamEventInfo {
EStreamTaskEvent event;
const char* name;
} SStreamEventInfo;
// SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
// void* streamDestroyStateMachine(SStreamTaskSM* pSM);
#ifdef __cplusplus
}
#endif

View File

@ -1768,8 +1768,8 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_options_set_recycle_log_file_num(opts, 6);
rocksdb_options_set_max_write_buffer_number(opts, 3);
rocksdb_options_set_info_log_level(opts, 1);
rocksdb_options_set_db_write_buffer_size(opts, 64 << 20);
rocksdb_options_set_write_buffer_size(opts, 32 << 20);
rocksdb_options_set_db_write_buffer_size(opts, 256 << 20);
rocksdb_options_set_write_buffer_size(opts, 128 << 20);
rocksdb_options_set_atomic_flush(opts, 1);
pTaskDb->dbOpt = opts;
@ -1780,6 +1780,7 @@ void taskDbInitOpt(STaskDbWrapper* pTaskDb) {
rocksdb_options_set_compaction_filter_factory(pTaskDb->dbOpt, pTaskDb->filterFactory);
pTaskDb->readOpt = rocksdb_readoptions_create();
pTaskDb->writeOpt = rocksdb_writeoptions_create();
rocksdb_writeoptions_disable_WAL(pTaskDb->writeOpt, 1);
size_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
pTaskDb->pCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));

View File

@ -36,6 +36,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -50,6 +51,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
@ -151,7 +153,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// todo this status may not be set here.
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
ASSERT(code == TSDB_CODE_SUCCESS);
pTask->chkInfo.transId = pReq->transId;
pTask->chkInfo.checkpointingId = pReq->checkpointId;
@ -160,8 +163,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->execInfo.checkpoint += 1;
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
return code;
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
}
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
@ -315,8 +317,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
pCKInfo->checkpointVer = pCKInfo->processedVer;
streamTaskClearCheckInfo(p, false);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&p->lock);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else {
stDebug("s-task:%s vgId:%d status:%s not keep the checkpoint metaInfo, checkpoint:%" PRId64 " failed", id, vgId,
pStatus->name, pCKInfo->checkpointingId);
@ -459,6 +462,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int64_t startTs = pTask->chkInfo.startTs;
int64_t ckId = pTask->chkInfo.checkpointingId;
const char* id = pTask->id.idStr;
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
// sink task do not need to save the status, and generated the checkpoint
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
@ -497,6 +501,21 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
}
if ((code == TSDB_CODE_SUCCESS) && dropRelHTask) {
// transferred from the halt status, it is done the fill-history procedure and finish with the checkpoint
// free it and remove fill-history task from disk meta-store
taosThreadMutexLock(&pTask->lock);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId);
} else {
stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId);
}
taosThreadMutexUnlock(&pTask->lock);
}
// clear the checkpoint info if failed
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexLock(&pTask->lock);

View File

@ -340,7 +340,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} else {
double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.;
stDebug(
"s-task:%s fill-history task end, scal wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
"s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer "
"exec state",
id, el, pStreamTask->id.idStr);
}
@ -380,56 +380,34 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// 1. expand the query time window for stream task of WAL scanner
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
stDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, p, pStreamTask->status.schedStatus);
} else {
stDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
}
// 1. expand the query time window for stream task of WAL scanner
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
} else {
stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr);
stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr);
}
// 2. transfer the ownership of executor state
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. resume the state of stream task, after this function, the stream task will run immediately.
streamTaskResume(pStreamTask);
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask);
// streamTaskResume(pStreamTask);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
// 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. assign the status to the value that will be kept in disk
// 4. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 6. open the inputQ for all upstream tasks
// 5. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask);
// 7. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0;
pDelBlock->info.version = 0;
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
pItem->pBlock = pDelBlock;
int32_t code = streamTaskPutDataIntoInputQ(pStreamTask, (SStreamQueueItem*)pItem);
stDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
}
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_SUCCESS;
}
@ -447,14 +425,24 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task and open inputQ of sink task
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask != NULL) {
// halt the related stream sink task
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
pStreamTask->id.idStr, tstrerror(code));
streamMetaReleaseTask(pMeta, pStreamTask);
return code;
} else {
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
}
streamTaskOpenAllUpstreamInput(pStreamTask);
streamTaskSendCheckpointReq(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
}
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
}
return code;
@ -718,7 +706,8 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
} else {
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK);
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
st == TASK_STATUS__HALT);
}
}
@ -771,8 +760,7 @@ static int32_t schedTaskInFuture(SStreamTask* pTask) {
pTask->status.schedIdleTime, ref);
// add one ref count for task
// todo this may be failed, and add ref may be failed.
SStreamTask* pAddRefTask = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId);
/*SStreamTask* pAddRefTask = */streamMetaAcquireOneTask(pTask);
if (pTask->schedInfo.pIdleTimer == NULL) {
pTask->schedInfo.pIdleTimer = taosTmrStart(doStreamExecTaskHelper, pTask->status.schedIdleTime, pTask, streamTimer);
@ -788,29 +776,29 @@ int32_t streamResumeTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
while (1) {
/*int32_t code = */doStreamExecTask(pTask);
/*int32_t code = */ doStreamExecTask(pTask);
taosThreadMutexLock(&pTask->lock);
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
clearTaskSchedInfo(pTask);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
pTask->status.schedStatus, pTask->status.lastExecTs);
return 0;
} else {
int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue);
// check if this task needs to be idle for a while
if (pTask->status.schedIdleTime > 0) {
schedTaskInFuture(pTask);
if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
taosThreadMutexUnlock(&pTask->lock);
setLastExecTs(pTask, taosGetTimestampMs());
char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p,
pTask->status.schedStatus, pTask->status.lastExecTs);
return 0;
}
}

View File

@ -257,8 +257,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
STaskDbWrapper* pBackend = *ppBackend;
pBackend->pMeta = pMeta;
pTask->backendRefId = pBackend->refId;
pTask->pBackend = pBackend;
taosThreadMutexUnlock(&pMeta->backendMutex);
@ -283,7 +281,6 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
}
int64_t tref = taosAddRef(taskDbWrapperId, pBackend);
pTask->backendRefId = tref;
pTask->pBackend = pBackend;
pBackend->refId = tref;
pBackend->pTask = pTask;
@ -467,7 +464,6 @@ void streamMetaClear(SStreamMeta* pMeta) {
}
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasksMap);
taosArrayClear(pMeta->pTaskList);
@ -505,7 +501,9 @@ void streamMetaCloseImpl(void* arg) {
return;
}
streamMetaWLock(pMeta);
streamMetaClear(pMeta);
streamMetaWUnLock(pMeta);
tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
@ -519,7 +517,6 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet);
// taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
@ -534,6 +531,8 @@ void streamMetaCloseImpl(void* arg) {
bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT;
taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
stDebug("end to close stream meta");
}
@ -647,6 +646,12 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
return p;
}
SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return pTask;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
if (ref > 0) {
@ -724,14 +729,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
if (pTask->info.fillHistory == 1) {
streamTaskClearHTaskAttr(pTask);
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
streamTaskClearHTaskAttr(pTask, false);
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
streamMetaRemoveTask(pMeta, &id);
streamMetaWUnLock(pMeta);
ASSERT(pTask->status.timerActive == 0);
@ -742,13 +749,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaRemoveTask(pMeta, &id);
streamMetaReleaseTask(pMeta, pTask);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta);
}
streamMetaWUnLock(pMeta);
return 0;
}
@ -1269,11 +1275,11 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
// pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
// while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
// taosMsleep(100);
// stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
// }
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
}
stDebug("vgId:%d start to check all tasks", vgId);
@ -1306,28 +1312,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
}
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosThreadRwlockRdlock(&pMeta->lock);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code);
} else {
// stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
stDebug("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosThreadRwlockWrlock(&pMeta->lock);
// stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosThreadRwlockUnlock(&pMeta->lock);
}

View File

@ -1054,6 +1054,24 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0;
}
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;

View File

@ -733,20 +733,30 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
return status;
}
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
SStreamMeta* pMeta = pTask->pMeta;
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
if (pTask->info.fillHistory == 0) {
return TSDB_CODE_SUCCESS;
return 0;
}
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
if (metaLock) {
streamMetaWLock(pTask->pMeta);
}
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
if (ppStreamTask != NULL) {
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
(int32_t)sTaskId.taskId);
taosThreadMutexLock(&(*ppStreamTask)->lock);
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
}
if (metaLock) {
streamMetaWUnLock(pTask->pMeta);
}
return TSDB_CODE_SUCCESS;
@ -852,3 +862,41 @@ void streamTaskResume(SStreamTask* pTask) {
bool streamTaskIsSinkTask(const SStreamTask* pTask) {
return pTask->info.taskLevel == TASK_LEVEL__SINK;
}
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1};
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}

View File

@ -31,9 +31,13 @@ SStreamTaskState StreamTaskStatusList[9] = {
{.state = TASK_STATUS__HALT, .name = "halt"},
{.state = TASK_STATUS__PAUSE, .name = "paused"},
{.state = TASK_STATUS__CK, .name = "checkpoint"},
// {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
};
typedef struct SStreamEventInfo {
EStreamTaskEvent event;
const char* name;
} SStreamEventInfo;
SStreamEventInfo StreamTaskEventList[12] = {
{.event = 0, .name = ""}, // dummy event, place holder
{.event = TASK_EVENT_INIT, .name = "initialize"},
@ -94,7 +98,9 @@ int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
}
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTask->hTaskInfo.haltVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
@ -402,6 +408,10 @@ SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask) {
return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment
}
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {
return pTask->status.pSM->prev.state.state;
}
const char* streamTaskGetStatusStr(ETaskStatus status) {
return StreamTaskStatusList[status].name;
}
@ -497,6 +507,8 @@ void doInitStateTransferTable(void) {
// checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
taosArrayPush(streamTaskSMTrans, &trans);

View File

@ -305,7 +305,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
}
int32_t walSkipFetchBody(SWalReader *pRead) {
wDebug("vgId:%d, skip fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
wDebug("vgId:%d, skip:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
", applied:%" PRId64 ", 0x%" PRIx64,
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);

View File

@ -740,6 +740,8 @@ char *tz_win[554][2] = {{"Asia/Shanghai", "China Standard Time"},
#include <unistd.h>
#endif
static int isdst_now = 0;
void taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, int8_t *outDaylight,
enum TdTimezone *tsTimezone) {
if (inTimezoneStr == NULL || inTimezoneStr[0] == 0) return;
@ -805,19 +807,19 @@ void taosSetSystemTimezone(const char *inTimezoneStr, char *outTimezoneStr, int8
tzset();
int32_t tz = (int32_t)((-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR);
*tsTimezone = tz;
tz += daylight;
tz += isdst_now;
sprintf(outTimezoneStr, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
*outDaylight = daylight;
sprintf(outTimezoneStr, "%s (%s, %s%02d00)", buf, tzname[isdst_now], tz >= 0 ? "+" : "-", abs(tz));
*outDaylight = isdst_now;
#else
setenv("TZ", buf, 1);
tzset();
int32_t tz = (int32_t)((-timezone * MILLISECOND_PER_SECOND) / MILLISECOND_PER_HOUR);
*tsTimezone = tz;
tz += daylight;
sprintf(outTimezoneStr, "%s (%s, %s%02d00)", buf, tzname[daylight], tz >= 0 ? "+" : "-", abs(tz));
*outDaylight = daylight;
tz += isdst_now;
sprintf(outTimezoneStr, "%s (%s, %s%02d00)", buf, tzname[isdst_now], tz >= 0 ? "+" : "-", abs(tz));
*outDaylight = isdst_now;
#endif
@ -895,6 +897,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
struct tm tm1;
taosLocalTime(&tx1, &tm1, NULL);
daylight = tm1.tm_isdst;
isdst_now = tm1.tm_isdst;
/*
* format example:
@ -1009,6 +1012,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
time_t tx1 = taosGetTimestampSec();
struct tm tm1;
taosLocalTime(&tx1, &tm1, NULL);
isdst_now = tm1.tm_isdst;
/*
* format example:

View File

@ -52,12 +52,12 @@ class TDTestCase(TBase):
tdLog.printNoPrefix("==========step3:fill data")
tdSql.query(f"select first(point_value) as pointValue from {dbname}.{tbname} where wstart between '2023-12-26 10:35:00' and '2023-12-26 10:40:00' interval(1M) fill(prev) order by wstart desc limit 100")
sql = f"select first(point_value) as pointValue from {dbname}.{tbname} where wstart between '2023-12-26 10:35:00' and '2023-12-26 10:40:00' interval(1M) fill(prev) order by wstart desc limit 100"
data = []
for i in range(6):
row = [5]
data.append(row)
tdSql.checkDataMem(data)
tdSql.checkDataMem(sql, data)
def stop(self):
tdSql.close()

View File

@ -53,7 +53,7 @@ class TDTestCase(TBase):
self.flushDb()
jfile = etool.curFile(__file__, "cquery_basic.json")
etool.benchMark(json = jfile)
def genTime(self, preCnt, cnt):
start = self.start_timestamp + preCnt * self.timestamp_step
@ -236,6 +236,165 @@ class TDTestCase(TBase):
if int(reals[k]) != v:
tdLog.exit(f"distribute {k} expect: {v} real: {reals[k]}")
def checkNull(self):
# abs unique concat_ws
ts = self.start_timestamp + 1
sql = f"insert into {self.db}.d0(ts) values({ts})"
tdSql.execute(sql)
sql = f'''select abs(fc),
unique(ic),
concat_ws(',',bin,nch),
timetruncate(bi,1s,0),
timediff(ic,bi,1s),
to_timestamp(nch,'yyyy-mm-dd hh:mi:ss.ms.us.ns')
from {self.db}.d0 where ts={ts}'''
tdSql.query(sql)
tdSql.checkData(0, 0, "None")
tdSql.checkData(0, 1, "None")
tdSql.checkData(0, 2, "None")
tdSql.checkData(0, 3, "None")
tdSql.checkData(0, 4, "None")
# substr from 0 start
sql1 = f"select substr(bin,1) from {self.db}.d0 order by ts desc limit 100"
sql2 = f"select bin from {self.db}.d0 order by ts desc limit 100"
self.checkSameResult(sql1, sql2)
#substr error input pos is zero
sql = f"select substr(bin,0,3) from {self.db}.d0 order by ts desc limit 100"
tdSql.error(sql)
# cast
nch = 99
sql = f"insert into {self.db}.d0(ts, nch) values({ts}, '{nch}')"
tdSql.execute(sql)
sql = f"select cast(nch as tinyint), \
cast(nch as tinyint unsigned), \
cast(nch as smallint), \
cast(nch as smallint unsigned), \
cast(nch as int unsigned), \
cast(nch as bigint unsigned), \
cast(nch as float), \
cast(nch as double), \
cast(nch as bool) \
from {self.db}.d0 where ts={ts}"
row = [nch, nch, nch, nch, nch, nch, nch, nch, True]
tdSql.checkDataMem(sql, [row])
# cast string is zero
ts += 1
sql = f"insert into {self.db}.d0(ts, nch) values({ts}, 'abcd')"
tdSql.execute(sql)
sql = f"select cast(nch as tinyint) from {self.db}.d0 where ts={ts}"
tdSql.checkFirstValue(sql, 0)
# iso8601
sql = f'select ts,to_iso8601(ts,"Z"),to_iso8601(ts,"+08"),to_iso8601(ts,"-08") from {self.db}.d0 where ts={self.start_timestamp}'
row = ['2023-11-15 06:13:20.000','2023-11-14T22:13:20.000Z','2023-11-15T06:13:20.000+08','2023-11-14T14:13:20.000-08']
tdSql.checkDataMem(sql, [row])
# constant expr funciton
# count
sql = f"select count(1),count(null) from {self.db}.d0"
tdSql.checkDataMem(sql, [[self.insert_rows+2, 0]])
row = [10, 11.0, "None", 2]
# sum
sql = "select sum(1+9),sum(1.1 + 9.9),sum(null),sum(4/2);"
tdSql.checkDataMem(sql, [row])
# min
sql = "select min(1+9),min(1.1 + 9.9),min(null),min(4/2);"
tdSql.checkDataMem(sql, [row])
# max
sql = "select max(1+9),max(1.1 + 9.9),max(null),max(4/2);"
tdSql.checkDataMem(sql, [row])
# avg
sql = "select avg(1+9),avg(1.1 + 9.9),avg(null),avg(4/2);"
tdSql.checkDataMem(sql, [row])
# stddev
sql = "select stddev(1+9),stddev(1.1 + 9.9),stddev(null),stddev(4/2);"
tdSql.checkDataMem(sql, [[0, 0.0, "None", 0]])
# leastsquares
sql = "select leastsquares(100,2,1), leastsquares(100.2,2.1,1);"
tdSql.query(sql)
# derivative
sql = "select derivative(190999,38.3,1);"
tdSql.checkFirstValue(sql, 0.0)
# irate
sql = "select irate(0);"
tdSql.checkFirstValue(sql, 0.0)
# diff
sql = "select diff(0);"
tdSql.checkFirstValue(sql, 0.0)
# twa
sql = "select twa(10);"
tdSql.checkFirstValue(sql, 10.0)
# mavg
sql = "select mavg(5,10);"
tdSql.checkFirstValue(sql, 5)
# mavg
sql = "select mavg(5,10);"
tdSql.checkFirstValue(sql, 5)
# mavg
sql = "select csum(4+9);"
tdSql.checkFirstValue(sql, 13)
# tail
sql = "select tail(1+9,1),tail(1.1 + 9.9,2),tail(null,3),tail(8/4,3);"
tdSql.error(sql)
sql = "select tail(4+9, 3);"
tdSql.checkFirstValue(sql, 13)
sql = "select tail(null, 1);"
tdSql.checkFirstValue(sql, "None")
# top
sql = "select top(4+9, 3);"
tdSql.checkFirstValue(sql, 13)
sql = "select top(9.9, 3);"
tdSql.checkFirstValue(sql, 9.9)
sql = "select top(null, 1);"
tdSql.error(sql)
# bottom
sql = "select bottom(4+9, 3);"
tdSql.checkFirstValue(sql, 13)
sql = "select bottom(9.9, 3);"
tdSql.checkFirstValue(sql, 9.9)
ops = ['GE', 'GT', 'LE', 'LT', 'EQ', 'NE']
vals = [-1, -1, 1, 1, -1, 1]
cnt = len(ops)
for i in range(cnt):
# statecount
sql = f"select statecount(99,'{ops[i]}',100);"
tdSql.checkFirstValue(sql, vals[i])
sql = f"select statecount(9.9,'{ops[i]}',11.1);"
tdSql.checkFirstValue(sql, vals[i])
# stateduration
sql = f"select stateduration(99,'{ops[i]}',100,1s);"
#tdSql.checkFirstValue(sql, vals[i]) bug need fix
tdSql.execute(sql)
sql = f"select stateduration(9.9,'{ops[i]}',11.1,1s);"
#tdSql.checkFirstValue(sql, vals[i]) bug need fix
tdSql.execute(sql)
# histogram check crash
sqls = [
'select histogram(200,"user_input","[10, 50, 200]",0);',
'select histogram(22.2,"user_input","[1.01, 5.01, 200.1]",0);',
'select histogram(200,"linear_bin",\'{"start": 0.0,"width": 5.0, "count": 5, "infinity": true}\',0)',
'select histogram(200.2,"linear_bin",\'{"start": 0.0,"width": 5.01, "count": 5, "infinity": true}\',0)',
'select histogram(200,"log_bin",\'{"start":1.0, "factor": 2.0, "count": 5, "infinity": true}\',0)',
'select histogram(200.2,"log_bin",\'{"start":1.0, "factor": 2.0, "count": 5, "infinity": true}\',0)'
]
tdSql.executes(sqls)
# errors check
sql = 'select histogram(200.2,"log_bin",\'start":1.0, "factor: 2.0, "count": 5, "infinity": true}\',0)'
tdSql.error(sql)
sql = 'select histogram("200.2","log_bin",\'start":1.0, "factor: 2.0, "count": 5, "infinity": true}\',0)'
tdSql.error(sql)
# first last
sql = "select first(100-90-1),last(2*5),first(11.1),last(22.2)"
tdSql.checkDataMem(sql, [[9, 10, 11.1, 22.2]])
# run
def run(self):
@ -253,6 +412,9 @@ class TDTestCase(TBase):
# do action
self.doQuery()
# check null
self.checkNull()
tdLog.success(f"{__file__} successfully executed")

View File

@ -254,8 +254,6 @@ class TDSql:
tdLog.info("sql:%s, expected expectErrInfo %s occured" % (sql, expectErrInfo))
else:
tdLog.exit("%s(%d) failed: sql:%s, expectErrInfo %s occured, but not expected errno %s" % (caller.filename, caller.lineno, sql, self.error_info, expectErrInfo))
else:
tdLog.info("sql:%s, expect error occured" % (sql))
return self.error_info
@ -402,7 +400,14 @@ class TDSql:
args = (caller.filename, caller.lineno, self.sql, row, col, self.res[row][col], data)
tdLog.exit("%s(%d) failed: sql:%s row:%d col:%d data:%s != expect:%s" % args)
else:
if self.res[row][col].astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
print(f"{self.res[row][col]}")
real = self.res[row][col]
if real is None:
# none
if str(real) == data:
if(show):
tdLog.info("check successfully")
elif real.astimezone(datetime.timezone.utc) == _parse_datetime(data).astimezone(datetime.timezone.utc):
# tdLog.info(f"sql:{self.sql}, row:{row} col:{col} data:{self.res[row][col]} == expect:{data}")
if(show):
tdLog.info("check successfully")
@ -490,7 +495,8 @@ class TDSql:
if(show):
tdLog.info("check successfully")
def checkDataMem(self, mem):
def checkDataMem(self, sql, mem):
self.query(sql)
if not isinstance(mem, list):
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql)
@ -506,7 +512,7 @@ class TDSql:
self.checkData(row, col, colData)
tdLog.info("check successfully")
def checkDataCsv(self, csvfilePath):
def checkDataCsv(self, sql, csvfilePath):
if not isinstance(csvfilePath, str) or len(csvfilePath) == 0:
caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, self.sql, csvfilePath)
@ -530,7 +536,7 @@ class TDSql:
tdLog.exit("%s(%d) failed: sql:%s, expect csvfile path:%s, read error:%s" % args)
tdLog.info("read csvfile read successfully")
self.checkDataMem(data)
self.checkDataMem(sql, data)
# return true or false replace exit, no print out
def checkRowColNoExit(self, row, col):

View File

@ -18,6 +18,7 @@ sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream1 trigger at_once fill_history 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
@ -224,53 +225,53 @@ endi
# row 2
if $data21 != 1 then
print ======$data21
print ======$data21, expect 1
goto loop01
endi
if $data22 != 1 then
print ======$data22
print ======$data22 , expect 1
goto loop01
endi
if $data23 != 3 then
print ======$data23
print ======$data23 , expect 3
goto loop01
endi
if $data24 != 2 then
print ======$data24
print ======$data24 , expect 2
goto loop01
endi
if $data25 != 3 then
print ======$data25
print ======$data25 , expect 3
goto loop01
endi
# row 3
if $data31 != 1 then
print ======$data31
print ======$data31 , expect 1
goto loop01
endi
if $data32 != 1 then
print ======$data32
print ======$data32 , expect 1
goto loop01
endi
if $data33 != 4 then
print ======$data33
print ======$data33 , expect 4
goto loop01
endi
if $data34 != 2 then
print ======$data34
print ======$data34 , expect 2
goto loop01
endi
if $data35 != 3 then
print ======$data35
print ======$data35 , expect 3
goto loop01
endi

View File

@ -152,7 +152,7 @@ class TDTestCase:
os.system(f"rm -rf {cPath}/../data")
print(self.projPath)
# this data file is special for coverage test in 192.168.1.96
os.system("cp -r f{self.projPath}/../comp_testdata/data/ {self.projPath}/sim/dnode1")
os.system(f"cp -r {self.projPath}/../comp_testdata/data/ {self.projPath}/community/sim/dnode1")
tdDnodes.stop(1)
tdDnodes.start(1)

View File

@ -224,6 +224,40 @@ class TDTestCase:
sql = f"select timediff(ts - {val}b, ts1) from st "
self.checkExpect(sql, val)
# timetruncate check
sql = '''select ts,timetruncate(ts,1u),
timetruncate(ts,1b),
timetruncate(ts,1m),
timetruncate(ts,1h),
timetruncate(ts,1w)
from t0 order by ts desc limit 1;'''
tdSql.query(sql)
tdSql.checkData(0,1, "2023-03-28 18:40:00.000009000")
tdSql.checkData(0,2, "2023-03-28 18:40:00.000009999")
tdSql.checkData(0,3, "2023-03-28 18:40:00.000000000")
tdSql.checkData(0,4, "2023-03-28 18:00:00.000000000")
tdSql.checkData(0,5, "2023-03-23 00:00:00.000000000")
# timediff
sql = '''select ts,timediff(ts,ts+1b,1b),
timediff(ts,ts+1u,1u),
timediff(ts,ts+1a,1a),
timediff(ts,ts+1s,1s),
timediff(ts,ts+1m,1m),
timediff(ts,ts+1h,1h),
timediff(ts,ts+1d,1d),
timediff(ts,ts+1w,1w)
from t0 order by ts desc limit 1;'''
tdSql.query(sql)
tdSql.checkData(0,1, 1)
tdSql.checkData(0,2, 1)
tdSql.checkData(0,3, 1)
tdSql.checkData(0,4, 1)
tdSql.checkData(0,5, 1)
tdSql.checkData(0,6, 1)
tdSql.checkData(0,7, 1)
tdSql.checkData(0,8, 1)
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.time() % 10000

View File

@ -218,6 +218,20 @@ class TDTestCase:
sql = f"select count(ts) from st where timediff(ts - {val}{uint}, ts1) = {usval} "
self.checkExpect(sql, expectVal)
# timetruncate check
sql = '''select ts,timetruncate(ts,1a),
timetruncate(ts,1s),
timetruncate(ts,1m),
timetruncate(ts,1h),
timetruncate(ts,1w)
from t0 order by ts desc limit 1;'''
tdSql.query(sql)
tdSql.checkData(0,1, "2023-03-28 18:40:00.009000")
tdSql.checkData(0,2, "2023-03-28 18:40:00.000000")
tdSql.checkData(0,3, "2023-03-28 18:40:00.000000")
tdSql.checkData(0,4, "2023-03-28 18:00:00.000000")
tdSql.checkData(0,5, "2023-03-23 00:00:00.000000")
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.time() % 10000

View File

@ -24,7 +24,8 @@ from util.dnodes import tdDnodes
from util.dnodes import *
class TDTestCase:
updatecfgDict = {'debugflag':0,'stdebugFlag': 143 ,"tqDebugflag":135}
def init(self, conn, logSql, replicaVar):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)

View File

@ -103,6 +103,10 @@ class TDTestCase:
f"select statecount(c1 ,'GT',1) , min(c1) from {dbname}.t1",
f"select statecount(c1 ,'GT',1) , spread(c1) from {dbname}.t1",
f"select statecount(c1 ,'GT',1) , diff(c1) from {dbname}.t1",
f"select statecount(c1 ,'GTA',1) , diff(c1) from {dbname}.t1",
f"select statecount(c1 ,'EQA',1) , diff(c1) from {dbname}.t1",
f"select statecount(c1 ,'',1) , diff(c1) from {dbname}.t1",
f"select statecount(c1 ,'E',1) , diff(c1) from {dbname}.t1",
]
for error_sql in error_sql_lists:
tdSql.error(error_sql)

View File

@ -215,7 +215,10 @@ class ClusterComCreate:
return
def alterStbMetaData(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=None):
tdLog.debug("alter Stb column ............")
tdLog.debug(f"describe {dbName}.{stbName} ")
tsql.execute(f"describe {dbName}.{stbName} ;")
tdLog.debug(f"ALTER STABLE {dbName}.{stbName} MODIFY COLUMN c3 binary(20);")
tsql.execute(f" ALTER STABLE {dbName}.{stbName} MODIFY COLUMN c3 binary(20);")
tdLog.debug(f"ALTER STABLE {dbName}.{stbName} ADD COLUMN c4 DOUBLE;")

View File

@ -6,8 +6,8 @@ from util.cases import *
from util.common import *
class TDTestCase:
updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
'asynclog': 0, 'stdebugflag':135}
updatecfgDict = {'debugFlag':0, 'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135,
'asynclog': 0, 'stdebugflag':143}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)