Merge pull request #29454 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch
This commit is contained in:
Shengliang Guan 2025-01-03 09:14:44 +08:00 committed by GitHub
commit de9c798b2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 2118 additions and 489 deletions

View File

@ -1,4 +1,4 @@
name: TaosKeeper CI
name: taosKeeper CI
on:
push:

402
.lgtm.yml
View File

@ -1,402 +0,0 @@
##########################################################################################
# Customize file classifications. #
# Results from files under any classifier will be excluded from LGTM #
# statistics. #
##########################################################################################
##########################################################################################
# Use the `path_classifiers` block to define changes to the default classification of #
# files. #
##########################################################################################
path_classifiers:
# docs:
# Identify the top-level file called `generate_javadoc.py` as documentation-related.
test:
# Override LGTM's default classification of test files by excluding all files.
- exclude: /
# Classify all files in the top-level directories tests/ and testsuites/ as test code.
- tests
# - testsuites
# Classify all files with suffix `.test` as test code.
# Note: use only forward slash / as a path separator.
# Use ** to indicate an arbitrary parent path.
# Use * to indicate any sequence of characters excluding /.
# Always enclose the expression in double quotes if it includes *.
# - "**/*.test"
# Refine the classifications above by excluding files in test/util/.
# - exclude: test/util
# The default behavior is to tag all files created during the
# build as `generated`. Results are hidden for generated code. You can tag
# further files as being generated by adding them to the `generated` section.
generated:
# Exclude all `*.c` files under the `ui/` directory from classification as
# generated code.
# - exclude: ui/**/*.c
# By default, all files not checked into the repository are considered to be
# 'generated'.
# The default behavior is to tag library code as `library`. Results are hidden
# for library code. You can tag further files as being library code by adding them
# to the `library` section.
library:
- exclude: deps/
# The default behavior is to tag template files as `template`. Results are hidden
# for template files. You can tag further files as being template files by adding
# them to the `template` section.
template:
#- exclude: path/to/template/code/**/*.c
# Define your own category, for example: 'some_custom_category'.
some_custom_category:
# Classify all files in the top-level directory tools/ (or the top-level file
# called tools).
# - tools
#########################################################################################
# Use the `queries` block to change the default display of query results. #
#########################################################################################
# queries:
# Start by hiding the results of all queries.
# - exclude: "*"
# Then include all queries tagged 'security' and 'correctness', and with a severity of
# 'error'.
# - include:
# tags:
# - "security"
# - "correctness"
# severity: "error"
# Specifically hide the results of two queries.
# - exclude: cpp/use-of-goto
# - exclude: java/equals-on-unrelated-types
# Refine by including the `java/command-line-injection` query.
# - include: java/command-line-injection
#########################################################################################
# Define changes to the default code extraction process. #
# Each block configures the extraction of a single language, and modifies actions in a #
# named step. Every named step includes automatic default actions, #
# except for the 'prepare' step. The steps are performed in the following sequence: #
# prepare #
# after_prepare #
# configure (C/C++ only) #
# python_setup (Python only) #
# before_index #
# index #
##########################################################################################
#########################################################################################
# Environment variables available to the steps: #
#########################################################################################
# LGTM_SRC
# The root of the source tree.
# LGTM_WORKSPACE
# An existing (initially empty) folder outside the source tree.
# Used for temporary download and setup commands.
#########################################################################################
# Use the extraction block to define changes to the default code extraction process #
# for one or more languages. The settings for each language are defined in a child #
# block, with one or more steps. #
#########################################################################################
extraction:
# Define settings for C/C++ analysis
#####################################
cpp:
# The `prepare` step exists for customization on LGTM.com only.
prepare:
# # The `packages` section is valid for LGTM.com only. It names Ubuntu packages to
# # be installed.
packages:
- cmake
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# This step is useful for C/C++ analysis where you want to prepare the environment
# for the `configure` step without changing the default behavior for that step.
# after_prepare:
#- export GNU_MAKE=make
#- export GIT=true
# The `configure` step generates build configuration files which the `index` step
# then uses to build the codebase.
configure:
command:
- mkdir build
- cd build
- cmake ..
# - ./prepare_deps
# Optional step. You should add a `before_index` step if you need to run commands
# before the `index` step.
# before_index:
# - export BOOST_DIR=$LGTM_SRC/boost
# - export GTEST_DIR=$LGTM_SRC/googletest
# - export HUNSPELL_DIR=$LGTM_SRC/hunspell
# - export CRYPTOPP_DIR=$LGTM_SRC/cryptopp
# The `index` step builds the code and extracts information during the build
# process.
index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
build_command:
- cd build
- make
# - $GNU_MAKE -j2 -s
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Specify MSBuild settings
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /p:Platform=x64 /p:Configuration=Release
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration:
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform:
# Specify the MSBuild target. Default: rebuild.
# target:
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files.
# vstools_version: 10
# Define settings for C# analysis
##################################
# csharp:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
#index:
# Specify that all project or solution files should be used for extraction.
# Default: false.
# all_solutions: true
# Specify a list of one or more project or solution files for extraction.
# Default: LGTM chooses the file closest to the root of the repository (this may
# fail if there are multiple candidates).
# solution:
# - myProject.sln
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./example-compile-all.sh
# By default, LGTM analyzes the code by building it. You can override this,
# and tell LGTM not to build the code. Beware that this can lead
# to less accurate results.
# buildless: true
# Specify .NET Core settings.
# dotnet:
# Specify additional arguments to `dotnet build`.
# Default: empty.
# arguments: "example_arg"
# Specify the version of .NET Core SDK to use.
# Default: The version installed on the build machine.
# version: 2.1
# Specify MSBuild settings.
# msbuild:
# Specify a list of additional arguments to MSBuild. Default: empty.
# arguments: /P:WarningLevel=2
# Specify the MSBuild configuration to use, for example, debug or release.
# Default: read from the solution file or files.
# configuration: release
# Specify the platform to target, for example: x86, x64, or Any CPU.
# Default: read from the solution file or files.
# platform: x86
# Specify the MSBuild target. Default: rebuild.
# target: notest
# Specify whether or not to perform a NuGet restore for extraction. Default: true.
# nuget_restore: false
# Specify a version of Microsoft Visual Studio to use for MSBuild or any custom
# build commands (build_command). For example:
# 10 for Visual Studio 2010
# 12 for Visual Studio 2012
# 14 for Visual Studio 2015
# 15 for Visual Studio 2017
# Default: read from project files
# vstools_version: 10
# Specify additional options for the extractor,
# for example --fast to perform a faster extraction that produces a smaller
# database.
# extractor: "--fast"
# Define settings for Go analysis
##################################
# go:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the `prepare` step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step builds the code and extracts information during the build
# process.
# index:
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command:
# - ./compile-all.sh
# Define settings for Java analysis
####################################
# java:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify Gradle settings.
# gradle:
# Specify the required Gradle version.
# Default: determined automatically.
# version: 4.4
# Override the autobuild process by specifying a list of custom build commands
# to use instead.
# build_command: ./compile-all.sh
# Specify the Java version required to build the project.
# java_version: 11
# Specify whether to extract Java .properties files
# Default: false
# properties_files: true
# Specify Maven settings.
# maven:
# Specify the path (absolute or relative) of a Maven settings file to use.
# Default: Maven uses a settings file in the default location, if it exists.
# settings_file: /opt/share/settings.xml
# Specify the path of a Maven toolchains file.
# Default: Maven uses a toolchains file in the default location, if it exists.
# toolchains_file: /opt/share/toolchains.xml
# Specify the required Maven version.
# Default: the Maven version is determined automatically, where feasible.
# version: 3.5.2
# Specify how XML files should be extracted:
# all = extract all XML files.
# default = only extract XML files named `AndroidManifest.xml`, `pom.xml`, and `web.xml`.
# disabled = do not extract any XML files.
# xml_mode: all
# Define settings for JavaScript analysis
##########################################
# javascript:
# The `prepare` step exists for customization on LGTM.com only.
# prepare:
# packages:
# - example_package
# Add an `after-prepare` step if you need to run commands after the prepare step.
# Each command should be listed on a separate line.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# The `index` step extracts information from the files in the codebase.
# index:
# Specify a list of files and folders to extract.
# Default: The project root directory.
# include:
# - src/js
# Specify a list of files and folders to exclude from extraction.
# exclude:
# - thirdparty/lib
# You can add additional file types for LGTM to extract, by mapping file
# extensions (including the leading dot) to file types. The usual
# include/exclude patterns apply, so, for example, `.jsm` files under
# `thirdparty/lib` will not be extracted.
# filetypes:
# ".jsm": "js"
# ".tmpl": "html"
# Specify a list of glob patterns to include/exclude files from extraction; this
# is applied on top of the include/exclude paths from above; patterns are
# processed in the same way as for path classifiers above.
# Default: include all files with known extensions (such as .js, .ts and .html),
# but exclude files ending in `-min.js` or `.min.js` and folders named `node_modules`
# or `bower_components`
# filters:
# exclude any *.ts files anywhere.
# - exclude: "**/*.ts"
# but include *.ts files under src/js/typescript.
# - include: "src/js/typescript/**/*.ts"
# Specify how TypeScript files should be extracted:
# none = exclude all TypeScript files.
# basic = extract syntactic information from TypeScript files.
# full = extract syntactic and type information from TypeScript files.
# Default: full.
# typescript: basic
# By default, LGTM doesn't extract any XML files. You can override this by
# using the `xml_mode` property and setting it to `all`.
# xml_mode: all
# Define settings for Python analysis
######################################
# python:
# # The `prepare` step exists for customization on LGTM.com only.
# # prepare:
# # # The `packages` section is valid for LGTM.com only. It names packages to
# # # be installed.
# # packages: libpng-dev
# # This step is useful for Python analysis where you want to prepare the
# # environment for the `python_setup` step without changing the default behavior
# # for that step.
# after_prepare:
# - export PATH=$LGTM_WORKSPACE/tools:$PATH
# # This sets up the Python interpreter and virtual environment, ready for the
# # `index` step to extract the codebase.
# python_setup:
# # Specify packages that should NOT be installed despite being mentioned in the
# # requirements.txt file.
# # Default: no package marked for exclusion.
# exclude_requirements:
# - pywin32
# # Specify a list of pip packages to install.
# # If any of these packages cannot be installed, the extraction will fail.
# requirements:
# - Pillow
# # Specify a list of requirements text files to use to set up the environment,
# # or false for none. Default: any requirements.txt, test-requirements.txt,
# # and similarly named files identified in the codebase are used.
# requirements_files:
# - required-packages.txt
# # Specify a setup.py file to use to set up the environment, or false for none.
# # Default: any setup.py files identified in the codebase are used in preference
# # to any requirements text files.
# setup_py: new-setup.py
# # Override the version of the Python interpreter used for setup and extraction
# # Default: Python 3.
# version: 2
# # Optional step. You should add a `before_index` step if you need to run commands
# # before the `index` step.
# before_index:
# - antlr4 -Dlanguage=Python3 Grammar.g4
# # The `index` step extracts information from the files in the codebase.
# index:
# # Specify a list of files and folders to exclude from extraction.
# # Default: Git submodules and Subversion externals.
# exclude:
# - legacy-implementation
# - thirdparty/libs
# filters:
# - exclude: "**/documentation/examples/snippets/*.py"
# - include: "**/documentation/examples/test_application/*"
# include:
# - example/to/include

View File

@ -1,4 +1,3 @@
<p>
<p align="center">
<a href="https://tdengine.com" target="_blank">
<img
@ -8,10 +7,12 @@
/>
</a>
</p>
<p>
[![Build Status](https://github.com/taosdata/TDengine/actions/workflows/taosd-ci-build.yml/badge.svg)](https://github.com/taosdata/TDengine/actions/workflows/taosd-ci-build.yml)
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/taosdata/tdengine/taosd-ci-build.yml)](https://github.com/taosdata/TDengine/actions/workflows/taosd-ci-build.yml)
[![Coverage Status](https://coveralls.io/repos/github/taosdata/TDengine/badge.svg?branch=3.0)](https://coveralls.io/github/taosdata/TDengine?branch=3.0)
![GitHub commit activity](https://img.shields.io/github/commit-activity/m/taosdata/tdengine)
<br />
![GitHub Release](https://img.shields.io/github/v/release/taosdata/tdengine)
![GitHub License](https://img.shields.io/github/license/taosdata/tdengine)
[![CII Best Practices](https://bestpractices.coreinfrastructure.org/projects/4201/badge)](https://bestpractices.coreinfrastructure.org/projects/4201)
<br />
@ -20,9 +21,6 @@
[![Discord Community](https://img.shields.io/badge/Join_Discord--white?logo=discord&style=social)](https://discord.com/invite/VZdSuUg4pS)
[![LinkedIn](https://img.shields.io/badge/Follow_LinkedIn--white?logo=linkedin&style=social)](https://www.linkedin.com/company/tdengine)
[![StackOverflow](https://img.shields.io/badge/Ask_StackOverflow--white?logo=stackoverflow&style=social&logoColor=orange)](https://stackoverflow.com/questions/tagged/tdengine)
<br />
![GitHub Release](https://img.shields.io/github/v/release/taosdata/tdengine)
![GitHub commit activity](https://img.shields.io/github/commit-activity/m/taosdata/tdengine)
English | [简体中文](README-CN.md) | [TDengine Cloud](https://cloud.tdengine.com) | [Learn more about TSDB](https://tdengine.com/tsdb/)

View File

@ -109,7 +109,7 @@ If you are using Maven to manage your project, simply add the following dependen
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
```

View File

@ -0,0 +1,375 @@
---
sidebar_label: Flink
title: TDengine Flink Connector
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
Apache Flink is an open-source distributed stream batch integrated processing framework supported by the Apache Software Foundation, which can be used for many big data processing scenarios such as stream processing, batch processing, complex event processing, real-time data warehouse construction, and providing real-time data support for machine learning. At the same time, Flink has a wealth of connectors and various tools that can interface with numerous different types of data sources to achieve data reading and writing. In the process of data processing, Flink also provides a series of reliable fault-tolerant mechanisms, effectively ensuring that tasks can run stably and continuously even in the event of unexpected situations.
With the help of TDengine's Flink connector, Apache Flink can seamlessly integrate with the TDengine database. On the one hand, it can accurately store the results obtained after complex calculations and deep analysis into the TDengine database, achieving efficient storage and management of data; On the other hand, it is also possible to quickly and stably read massive amounts of data from the TDengine database, and conduct comprehensive and in-depth analysis and processing on this basis, fully tapping into the potential value of the data, providing strong data support and scientific basis for enterprise decision-making, greatly improving the efficiency and quality of data processing, and enhancing the competitiveness and innovation ability of enterprises in the digital age.
## Prerequisites
Prepare the following environment:
- TDengine cluster has been deployed and is running normally (both enterprise and community versions are available)
- TaosAdapter can run normally.
- Apache Flink v1.19.0 or above is installed. Please refer to the installation of Apache Flink [Official documents](https://flink.apache.org/)
## Supported platforms
Flink Connector supports all platforms that can run Flink 1.19 and above versions.
## Version History
| Flink Connector Version | Major Changes | TDengine Version|
|-------------------------| ------------------------------------ | ---------------- |
| 2.0.0 | 1.Support SQL queries on data in TDengine database <br/> 2. Support CDC subscription to data in TDengine database<br/> 3. Supports reading and writing to TDengine database using Table SQL | 3.3.5.0 and above versions|
| 1.0.0 | Support Sink function to write data from other sources to TDengine in the future| 3.3.2.0 and above versions|
## Exception and error codes
After the task execution fails, check the Flink task execution log to confirm the reason for the failure
Please refer to:
| Error Code | Description | Suggested Actions |
| ---------------- |------------------------------------------------------- | -------------------- |
|0xa000 | connection param error | Connector parameter error.
|0xa001 | the groupid parameter of CDC is incorrect | The groupid parameter of CDC is incorrect.|
|0xa002 | wrong topic parameter for CDC | The topic parameter for CDC is incorrect.|
|0xa010 | database name configuration error | database name configuration error.|
|0xa011 | table name configuration error | Table name configuration error.|
|0xa012 | no data was obtained from the data source | Failed to retrieve data from the data source.|
|0xa013 | value.deserializer parameter not set | No serialization method set.|
|0xa014 | list of column names set incorrectly | List of column names for target table not set. |
|0x2301 | connection already closed | The connection has been closed. Check the connection status or create a new connection to execute the relevant instructions.|
|0x2302 | this operation is NOT supported currently | The current interface is not supported, you can switch to other connection methods.|
|0x2303 | invalid variables | The parameter is invalid. Please check the corresponding interface specification and adjust the parameter type and size.|
|0x2304 | statement is closed | Statement has already been closed. Please check if the statement is closed and reused, or if the connection is working properly.|
|0x2305 | resultSet is closed | The ResultSet has been released. Please check if the ResultSet has been released and used again.|
|0x230d | parameter index out of range | parameter out of range, please check the reasonable range of the parameter.|
|0x230e | connection already closed | The connection has been closed. Please check if the connection is closed and used again, or if the connection is working properly.|
|0x230f | unknown SQL type in TDengine | Please check the Data Type types supported by TDengine.|
|0x2315 | unknown tao type in TDengine | Did the correct TDengine data type be specified when converting TDengine data type to JDBC data type.|
|0x2319 | user is required | Username information is missing when creating a connection.|
|0x231a | password is required | Password information is missing when creating a connection.|
|0x231d | can't create connection with server within | Increase connection time by adding the parameter httpConnectTimeout, or check the connection status with taosAdapter.|
|0x231e | failed to complete the task within the specified time | Increase execution time by adding the parameter messageWaitTimeout, or check the connection with taosAdapter.|
|0x2352 | unsupported encoding | An unsupported character encoding set was specified under the local connection.|
|0x2353 | internal error of database, Please see taoslog for more details | An error occurred while executing prepareStatement on the local connection. Please check the taoslog for problem localization.|
|0x2354 | connection is NULL | Connection has already been closed while executing the command on the local connection. Please check the connection with TDengine.|
|0x2355 | result set is NULL | Local connection to obtain result set, result set exception, please check connection status and retry.|
|0x2356 | invalid num of fields | The meta information obtained from the local connection result set does not match.|
|0x2357 | empty SQL string | Fill in the correct SQL for execution.|
|0x2371 | consumer properties must not be null | When creating a subscription, the parameter is empty. Please fill in the correct parameter.|
|0x2375 | topic reference has been destroyed | During the process of creating a data subscription, the topic reference was released. Please check the connection with TDengine.|
|0x2376 | failed to set consumer topic, Topic name is empty | During the process of creating a data subscription, the subscription topic name is empty. Please check if the specified topic name is filled in correctly.|
|0x2377 | consumer reference has been destroyed | The subscription data transmission channel has been closed, please check the connection with TDengine.|
|0x2378 | consumer create error | Failed to create data subscription. Please check the taos log based on the error message to locate the problem.|
|0x237a | vGroup not found in result set VGroup | Not assigned to the current consumer, due to the Rebalance mechanism, the relationship between Consumer and VGroup is not bound.|
## Data type mapping
TDengine currently supports timestamp, number, character, and boolean types, and the corresponding type conversions with Flink RowData Type are as follows:
| TDengine DataType | Flink RowDataType |
| ----------------- | ------------------ |
| TIMESTAMP | TimestampData |
| INT | Integer |
| BIGINT | Long |
| FLOAT | Float |
| DOUBLE | Double |
| SMALLINT | Short |
| TINYINT | Byte |
| BOOL | Boolean |
| BINARY | byte[] |
| NCHAR | StringData |
| JSON | StringData |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
## Instructions for use
### Flink Semantic Selection Instructions
The semantic reason for using At Least One (at least once) is:
- TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination.
- Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations.
- Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows:
Instructions:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
```
If using Maven to manage a project, simply add the following dependencies in pom.xml.
```xml
<dependency>
<groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId>
<version>2.0.0</version>
</dependency>
```
The parameters for establishing a connection include URL and Properties.
The URL specification format is:
`jdbc: TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]`
Parameter description:
- User: Login TDengine username, default value is' root '.
- Password: User login password, default value 'taosdata'.
- database_name: database name。
- timezone: time zone。
- HttpConnectTimeout: The connection timeout time, measured in milliseconds, with a default value of 60000.
- MessageWaitTimeout: The timeout period for a message, measured in milliseconds, with a default value of 60000.
- UseSSL: Whether SSL is used in the connection.
### Source
Source retrieves data from the TDengine database, converts it into a format and type that Flink can handle internally, and reads and distributes it in parallel, providing efficient input for subsequent data processing.
By setting the parallelism of the data source, multiple threads can read data from the data source in parallel, improving the efficiency and throughput of data reading, and fully utilizing cluster resources for large-scale data processing capabilities.
#### Source Properties
The configuration parameters in Properties are as follows:
- TDengineConfigParams.PROPERTY_KEY_USER: Login to TDengine username, default value is 'root '.
- TDengineConfigParams.PROPERTY_KEY_PASSWORD: User login password, default value 'taosdata'.
- TDengineConfigParams.VALUE_DESERIALIZER: The downstream operator receives the result set deserialization method. If the received result set type is `RowData` of `Flink`, it only needs to be set to `RowData`. It is also possible to inherit `TDengineRecordDeserialization` and implement `convert` and `getProducedType` methods, customizing the deserialization method based on `ResultSet` of `SQL`.
- TDengineConfigParams.TD_BATCH_MODE: This parameter is used to batch push data to downstream operators. If set to True, when creating the `TDengine Source` object, it is necessary to specify the data type as a `Template` form of the `SourceRecords` type.
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: Message timeout time, in milliseconds, default value is 60000.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: Turn off SSL certificate verification. true: Enable, false: Not enabled. The default is false.
#### Split by time
Users can split the SQL query into multiple subtasks based on time, entering: start time, end time, split interval, time field name. The system will split and obtain data in parallel according to the set interval (time left closed and right open).
```java
{{#include docs/examples/flink/Main.java:time_interval}}
```
Splitting by Super Table TAG
Users can split the query SQL into multiple query conditions based on the TAG field of the super table, and the system will split them into subtasks corresponding to each query condition, thereby obtaining data in parallel.
```java
{{#include docs/examples/flink/Main.java:tag_split}}
```
Classify by table
Support sharding by inputting multiple super tables or regular tables with the same table structure. The system will split them according to the method of one table, one task, and then obtain data in parallel.
```java
{{#include docs/examples/flink/Main.java:table_split}}
```
Use Source connector
The query result is RowData data type example:
<details>
<summary>RowData Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_test}}
```
</details>
Example of batch query results:
<details>
<summary>Batch Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_batch_test}}
```
</details>
Example of custom data type query result:
<details>
<summary>Custom Type Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_custom_type_test}}
```
</details>
- ResultBean is a custom inner class used to define the data type of the Source query results.
- ResultSoureDeserialization is a custom inner class that inherits `TDengine` RecordDesrialization and implements convert and getProducedType methods.
### CDC Data Subscription
Flink CDC is mainly used to provide data subscription functionality, which can monitor real-time changes in TDengine database data and transmit these changes in the form of data streams to Flink for processing, while ensuring data consistency and integrity.
Parameter Description
- TDengineCdcParams.BOOTSTRAP_SERVERS: `ip:port` of the TDengine server, if using WebSocket connection, then it is the `ip:port` where taosAdapter is located.
- TDengineCdcParams.CONNECT_USER: Login to TDengine username, default value is 'root '.
- TDengineCdcParams.CONNECT_PASS: User login password, default value 'taosdata'.
- TDengineCdcParams.POLL_INTERVAL_MS: Pull data interval, default 500ms.
- TDengineCdcParams. VALUE_DESERIALIZER: Result set deserialization method, If the received result set type is `RowData` of `Flink`, simply set it to 'RowData'. You can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer`, specify the result set bean, and implement deserialization. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and customize the deserialization method based on the SQL resultSet.
- TDengineCdcParams.TMQ_BATCH_MODE: This parameter is used to batch push data to downstream operators. If set to True, when creating the `TDengineCdcSource` object, it is necessary to specify the data type as a template form of the `ConsumerRecords` type.
- TDengineCdcParams.GROUP_ID: Consumer group ID, the same consumer group shares consumption progress。Maximum length: 192.
- TDengineCdcParams.AUTO_OFFSET_RESET: Initial position of the consumer group subscription `earliest` subscribe from the beginning, `latest` subscribe from the latest data, default `latest`)。
- TDengineCdcParams.ENABLE_AUTO_COMMIT: Whether to enable automatic consumption point submissiontrue: automatic submissionfalsesubmit based on the `checkpoint` time, default to false.
> **Note**The automatic submission mode of the reader automatically submits data after obtaining it, regardless of whether the downstream operator has processed the data correctly. There is a risk of data loss, and it is mainly used for efficient stateless operator scenarios or scenarios with low data consistency requirements.
- TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS: Time interval for automatically submitting consumption records, in milliseconds, default 5000. This parameter takes effect when `ENABLE_AUTO_COMMIT` is set to true.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS: Timeout after consumer heartbeat is lost, after which rebalance logic is triggered, and upon success, that consumer will be removed (supported from version 3.3.3.0)Default is 12000, range [6000, 1800000].
- TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS: The longest time interval for consumer poll data fetching, exceeding this time will be considered as the consumer being offline, triggering rebalance logic, and upon success, that consumer will be removed (supported from version 3.3.3.0) Default is 300000, range [1000, INT32_MAX].
#### Use CDC connector
The CDC connector will create consumers based on the parallelism set by the user, so the user should set the parallelism reasonably according to the resource situation.
The subscription result is RowData data type example:
<details>
<summary>CDC Source</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_source}}
```
</details>
Example of batch query results:
<details>
<summary>CDC Batch Source</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_batch_source}}
```
</details>
Example of custom data type query result:
<details>
<summary>CDC Custom Type</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_custom_type_test}}
```
</details>
- ResultBean is a custom inner class whose field names and data types correspond one-to-one with column names and data types. This allows the deserialization class corresponding to the value.ddeserializer property to deserialize objects of ResultBean type.
### Sink
The core function of Sink is to efficiently and accurately write Flink processed data from different data sources or operators into TDengine. In this process, the efficient write mechanism possessed by TDengine played a crucial role, effectively ensuring the fast and stable storage of data.
Sink Properties
- TDengineConfigParams.PROPERTY_KEY_USER: Login to TDengine username, default value is 'root '.
- TDengineConfigParams.PROPERTY_KEY_PASSWORD: User login password, default value 'taosdata'.
- TDengineConfigParams.PROPERTY_KEY_DBNAME: The database name.
- TDengineConfigParams.TD_SUPERTABLE_NAME:The name of the super table. The received data must have a tbname field to determine which sub table to write to.
- TDengineConfigParams.TD_TABLE_NAME: The table name of a sub table or a normal table. This parameter only needs to be set together with `TD_SUPERTABLE_NAME`.
- TDengineConfigParams.VALUE_DESERIALIZER: The deserialization method for receiving result sets. If the type of the received result set is RowData of Flink, it only needs to be set to RowData. It is also possible to inherit 'TDengine SinkRecordSequencer' and implement the 'serialize' method, customizing the deserialization method based on the received data type.
- TDengineConfigParams.TD_BATCH_SIZE: Set the batch size for writing to the `TDengine` database once | Writing will be triggered when the number of batches is reached, or when a checkpoint is set.
- TDengineConfigParams.TD_BATCH_MODE: When set to True for receiving batch data, if the data source is `TDengine Source` , use the `SourceRecords Template` type to create a `TDengineSink` object; If the source is `TDengine CDC`, use the `ConsumerRecords Template` to create a `TDengineSink` object.
- TDengineConfigParams.TD_SOURCE_TYPE: Set the data source. When the data source is `TDengine Source`, it is set to 'tdengine_stource', and when the source is `TDengine CDC`, it is set to 'tdengine_cdc'. When the configuration of `TD_BATCH_MODE` is set to True, it takes effect.
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: Message timeout time, in milliseconds, default value is 60000.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: Is compression enabled during the transmission process. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: Whether to enable automatic reconnection. true: Enable, false: Not enabled. The default is false.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: Automatic reconnection retry interval, in milliseconds, default value 2000. It only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: The default value for automatic reconnection retry is 3, which only takes effect when `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` is true.
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: Turn off SSL certificate verification. true: Enable, false: Not enabled. The default is false.
Usage example:
Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
<details>
<summary>Sink RowData</summary>
```java
{{#include docs/examples/flink/Main.java:RowDataToSink}}
```
</details>
Usage example:
Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
<details>
<summary>Cdc Sink</summary>
```java
{{#include docs/examples/flink/Main.java:CdcRowDataToSink}}
```
</details>
### Table SQL
Extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.) using Table SQL, perform custom operator operations (such as data cleaning, format conversion, associating data from different tables, etc.), and then load the processed results into the target data source (such as TDengine, MySQL, etc.).
#### Source connector
Parameter configuration instructions:
| Parameter Name | Type | Parameter Description |
|-----------------------| :-----: | ------------ |
| connector | string | connector identifier, set `tdengine-connector`|
| td.jdbc.url | string | url of the connection |
| td.jdbc.mode | strng | connector type: `source`, `sink`|
| table.name | string | original or target table name |
| scan.query | string | SQL statement to retrieve data|
| sink.db.name | string | target database name|
| sink.supertable.name | string | name of the supertable|
| sink.batch.size | integer| batch size written|
| sink.table.name | string | the table name of a sub table or a normal table |
Usage example:
Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
<details>
<summary>Table Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_table}}
```
</details>
#### Table CDC connector
Parameter configuration instructions:
| Parameter Name | Type | Parameter Description |
|-------------------| :-----: |--------------------------------------------------------------------------------------|
| connector | string | connector identifier, set `tdengine-connector` |
| user | string | username, default root |
| password | string | password, default taosdata |
| bootstrap. servers| string | server address |
| topic | string | subscribe to topic |
| td.jdbc.mode | strng | connector type: `cdc`, `sink` |
| group.id | string | consumption group ID, sharing consumption progress within the same consumption group |
| auto.offset.reset | string | initial position for consumer group subscription. <br/> `earliest`: subscribe from the beginning <br/> `latest` subscribe from the latest data <br/>default `latest`|
| poll.interval_mas | integer | pull data interval, default 500ms |
| sink.db.name | string | target database name |
| sink.supertable.name | string | name of the supertable |
| sink.batch.size | integer | batch size written |
| sink.table.name | string | the table name of a sub table or a normal table |
Usage example:
Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
<details>
<summary>Table CDC</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_table}}
```
</details>

View File

@ -33,6 +33,7 @@ The JDBC driver implementation for TDengine strives to be consistent with relati
| taos-jdbcdriver Version | Major Changes | TDengine Version |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------ |
| 3.5.1 | Fixed the getObject issue in data subscription. | - |
| 3.5.0 | 1. Optimized the performance of WebSocket connection parameter binding, supporting parameter binding queries using binary data. <br/> 2. Optimized the performance of small queries in WebSocket connection. <br/> 3. Added support for setting time zone and app info on WebSocket connection. | 3.3.5.0 and higher |
| 3.4.0 | 1. Replaced fastjson library with jackson. <br/> 2. WebSocket uses a separate protocol identifier. <br/> 3. Optimized background thread usage to avoid user misuse leading to timeouts. | - |
| 3.3.4 | Fixed getInt error when data type is float. | - |

View File

@ -297,3 +297,22 @@ Reporting this error indicates that the first connection to the cluster was succ
Therefore, first, check whether all ports on the server and cluster (default 6030 for native connections and 6041 for HTTP connections) are open; Next, check if the client's hosts file has configured the fqdn and IP information for all dnodes in the cluster.
If the issue still cannot be resolved, it is necessary to contact Taos technical personnel for support.
### 32 Why is the original database lost and the cluster ID changed when the data directory dataDir of the database remains unchanged on the same server?
Background: When the TDengine server process (taosd) starts, if there are no valid data file subdirectories (such as mnode, dnode, and vnode) under the data directory (dataDir, which is specified in the configuration file taos.cfg), these directories will be created automatically.When a new mnode directory is created, a new cluster ID will be allocated to generate a new cluster.
Cause analysis: The data directory dataDir of taosd can point to multiple different mount points.If these mount points are not configured for automatic mounting in the fstab file, after the server restarts, dataDir will only exist as a normal directory of the local disk, and it will not point to the mounted disk as expected.At this point, if the taosd service is started, it will create a new directory under dataDir to generate a new cluster.
Impact of the problem: After the server is restarted, the original database is lost (note: it is not really lost, but the original data disk is not attached and cannot be seen for the time being) and the cluster ID changes, resulting in the inability to access the original database. For enterprise users, if they have been authorized for the cluster ID, they will also find that the machine code of the cluster server has not changed, but the original authorization has expired.If the problem is not monitored or found and handled in time, the user will not notice that the original database has been lost, resulting in losses and increased operation and maintenance costs.
Problem solving: You should configure the automatic mount of the dataDir directory in the fstab file to ensure that the dataDir always points to the expected mount point and directory. At this point, restarting the server will retrieve the original database and cluster. In the subsequent version, we will develop a function to enable taosd to exit in the startup phase when it detects that the dataDir changes before and after startup, and provide corresponding error prompts.
### 33 How to solve MVCP1400.DLL loss when running TDengine on Windows platform?
1. Reinstall Microsoft Visual C++ Redistributable: As msvcp140.dll is part of Microsoft Visual C++Redistributable, reinstalling this package usually resolves most issues. You can download the corresponding version from the official Microsoft website for installation
2. Manually download and replace the msvcp140.dll file online: You can download the msvcp140.dll file from a reliable source and copy it to the corresponding directory in the system. Ensure that the downloaded files match your system architecture (32-bit or 64 bit) and ensure the security of the source
### 34 Which fast query data from super table with TAG filter or child table ?
Directly querying from child table is fast. The query from super table with TAG filter is designed to meet the convenience of querying. It can filter data from multiple child tables at the same time. If the goal is to pursue performance and the child table has been clearly queried, directly querying from the sub table can achieve higher performance
### 35 How to view data compression ratio indicators?
Currently, TDengine only provides compression ratios based on tables, not databases or the entire system. To view the compression ratios, execute the `SHOW TABLE DISTRIBUTED table_name;` command in the client taos-CLI. The table_name can be a super table, regular table, or subtable. For details [Click Here](https://docs.tdengine.com/tdengine-reference/sql-manual/show-commands/#show-table-distributed)

View File

@ -25,6 +25,10 @@ Download links for TDengine 3.x version installation packages are as follows:
import Release from "/components/ReleaseV3";
## 3.3.5.0
<Release type="tdengine" version="3.3.5.0" />
## 3.3.4.8
<Release type="tdengine" version="3.3.4.8" />

View File

@ -0,0 +1,85 @@
---
title: TDengine 3.3.5.0 Release Notes
sidebar_label: 3.3.5.0
description: Version 3.3.5.0 Notes
slug: /release-history/release-notes/3.3.5.0
---
## Features
1. feat: refactor MQTT to improve stability and performance
2. feat: refactor taosX incremental backup-restore
3. feat: add stmt2 apis in JDBC via websocket connection
4. feat: add stmt2 api in Rust connector
5. feat: add error codes in error prompts in taos-CLI
6. feat: superSet can connect TDengine with python connector
7. feat: configurable grafana dashboards in explorer management
8. feat: add taosX-agent in-memory cache queu capacity option
## Enhancements
1. enh: adjust the reporting mechanism of telemetry.
2. enh: support for SQL-based statistics of disk space for a specified DB.
3. enh: add memory management for SQL queries on the server side
4. enh: interval clause allows the use of the AUTO keyword to specify the window offset.
5. enh: reduce the impact on data write performance during data migration across multi-level storage
6. enh: migrate from angular to react for grafana 11.3+
7. enh: refactor taosAdapter websocket api for a slightly better perf
8. enh: add health state in taosX task status
9. enh: taosX add configurations to handle exceptions
10. enh: support setting options for client connections, including time zone, character set, user IP, and user name.
11. enh: taosdump support retry after connection timeout or broken
12. enh: allow creating index for tags that already subscribed
13. enh: taosX now support literal special chars in password
14. enh: improve data write performance when Last Cache is activated.
15. enh: compact command supports automatic execution, concurrency setting, and progress observation.
16. enh: support update global configuration parameters through SQL statements and persisting them.
17. enh: update the default compression method for all data types to improve the compression ratio in most scenarios.
18. enh: taosBenchmark --nodrop fix for mac/window
19. enh: prohibit the simultaneous execution of DB compaction and replica change operations (Enterpris).
20. enh: taosdump support primary key tables
21. enh: display user IP and name in the results of the SHOW QUERIES and SHOW CONNECTIONS statements.
22. enh: (JDBC)support batch insertion into multiple tables
23. enh: support for dynamically modifying the dataDir parameter for multi-level storage.
24. enh: prefer db file under data_dir
25. enh: enforce users to set strong passwords, which must be 8 to 16 characters in length and include at least three types of characters from the following: uppercase letters, lowercase letters, numbers, and special characters.
26. enh: improve the speed at which clients acquire the new Leader.
27. enh: support negative regex pattern in opc point selector
## Fixes
1. fix: the potential for deadlocks when updating checkpoints in stream computing under high-load scenarios.
2. fix: write tmq data into target error when terrno already set
3. fix: taosd cannot start when there is data corruption in a block within the WAL
4. fix: taosBenchmark fails when taosd disconnected in replica 2/3
5. fix: log files being lost when they are switched frequently.
6. fix: the stream computing stops due to the data update within the window.
7. fix: libtaosws.so sets an incorrect error code when the connection is terminated while fetch data.
8. fix: taosX opc error in case of @-prefixed name
9. fix: fix permission denied with show vgroups sql in cloud
10. fix: fix sql syntax error when migrating from large stables with compress options
11. fix: incorrect memory estimation for vnode usage
12. fix: failed to perform UNION ALL query on constant strings of the varchar type.
13. fix: leader transfer during the execution of transaction may cause deadlock.
14. fix: rust connector invliad pointer addr in ws_stmt_get_tag_fields
15. fix: union statement fails when executing with subqueries containing multiple NULLs.
16. fix: the pause operation of stream computing might fail.
17. fix: when writing data into a sub-table with a table name length of 192 characters using an SQL statement, errors may occur if the table name is enclosed in backticks (`).
18. fix: when performing a join query on super tables across different databases, if each database contains only one vnode, the query will return an error.
19. fix: no enough disk space cause taosX panic
20. fix: when write data to a super table, using both bound and unbound simultaneously will trigger an exception.
21. fix: metrics non-exist cause panic when connect with agent
22. fix: when creating indexes for tag with a large character length, taosd may crash.
23. fix: when the input parameters for the functions first, last, last_row, and char exceed 127, the taosd may crash. https://github.com/taosdata/TDengine/issues/29241
24. fix: when the number of rows in the result set of the LIMIT statement exceeds the size of a single data block, the returned count does not match the expectation.
25. fix: when synchronizing data between clusters, if the target task is deleted, the source cluster may run out of memory
26. fix: metadata read-write lock misconfiguration leads to a very small chance of blocking writes.
27. fix: when importing CSV files using the INSERT INTO statement on the Windows platform, the absence of a newline character at the end of the file may lead to an issue of infinite loop reading.
28. fix: after the tags of the table are updated, the stream computing fails to recognize and apply the ne values.
29. fix: fix kafka timeout issue and improve performance and stability
30. fix: in sql queries, when both 'is null' and invalid 'in' filter conditions are included simultaneously, the query results are incorrect. https://github.com/taosdata/TDengine/issues/29067
31. fix: sql queries containing both 'IN' and 'BETWEEN' filter conditions result in incorrect query results. https://github.com/taosdata/TDengine/issues/28989
32. fix: when performing multiplication or division operations between timestamp and numeric types, the results are incorrect. https://github.com/taosdata/TDengine/issues/28339
33. fix: data type conversion error in the IN statement leads to incorrect query results. https://github.com/taosdata/TDengine/issues/29047 https://github.com/taosdata/TDengine/issues/28902
34. fix: the error in filtering results when constant conditions are combined with OR operators. https://github.com/taosdata/TDengine/issues/28904
35. fix: when performing subtraction operation on timestamp type, the negative value is not considered. https://github.com/taosdata/TDengine/issues/28906
36. fix: tag values may display incorrectly when using GROUP BY tag synatix
37. fix: gcc < 10 bug cause taosX compile error

View File

@ -5,6 +5,7 @@ slug: /release-history/release-notes
[3.3.4.8](./3-3-4-8/)
[3.3.5.0](./3.3.5.0)
[3.3.4.3](./3-3-4-3/)
[3.3.3.0](./3-3-3-0/)

View File

@ -19,7 +19,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>

View File

@ -47,7 +47,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
</dependencies>

View File

@ -18,7 +18,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
<!-- druid -->
<dependency>

View File

@ -17,7 +17,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>

View File

@ -47,7 +47,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
<dependency>

View File

@ -70,7 +70,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
<dependency>

View File

@ -67,7 +67,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
<!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/main/resources/lib/taos-jdbcdriver-2.0.15-dist.jar</systemPath>-->
</dependency>

View File

@ -0,0 +1,579 @@
package com.taosdata.flink.example;
import com.taosdata.flink.cdc.TDengineCdcSource;
import com.taosdata.flink.common.TDengineCdcParams;
import com.taosdata.flink.common.TDengineConfigParams;
import com.taosdata.flink.sink.TDengineSink;
import com.taosdata.flink.source.TDengineSource;
import com.taosdata.flink.source.entity.SourceSplitSql;
import com.taosdata.flink.source.entity.SplitType;
import com.taosdata.flink.source.entity.TimestampSplitInfo;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.shaded.curator5.com.google.common.base.Strings;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.junit.Assert;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import javax.xml.transform.Source;
import org.apache.flink.streaming.api.CheckpointingMode;
public class Main {
static String jdbcUrl = "jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata";
static void prepare() throws ClassNotFoundException, SQLException {
Properties properties = new Properties();
properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
String insertQuery = "INSERT INTO " +
"power.d1001 USING power.meters TAGS('California.SanFrancisco', 1) " +
"VALUES " +
"('2024-12-19 19:12:45.642', 50.30000, 201, 0.31000) " +
"('2024-12-19 19:12:46.642', 82.60000, 202, 0.33000) " +
"('2024-12-19 19:12:47.642', 92.30000, 203, 0.31000) " +
"('2024-12-19 18:12:45.642', 50.30000, 201, 0.31000) " +
"('2024-12-19 18:12:46.642', 82.60000, 202, 0.33000) " +
"('2024-12-19 18:12:47.642', 92.30000, 203, 0.31000) " +
"('2024-12-19 17:12:45.642', 50.30000, 201, 0.31000) " +
"('2024-12-19 17:12:46.642', 82.60000, 202, 0.33000) " +
"('2024-12-19 17:12:47.642', 92.30000, 203, 0.31000) " +
"power.d1002 USING power.meters TAGS('Alabama.Montgomery', 2) " +
"VALUES " +
"('2024-12-19 19:12:45.642', 50.30000, 204, 0.25000) " +
"('2024-12-19 19:12:46.642', 62.60000, 205, 0.33000) " +
"('2024-12-19 19:12:47.642', 72.30000, 206, 0.31000) " +
"('2024-12-19 18:12:45.642', 50.30000, 204, 0.25000) " +
"('2024-12-19 18:12:46.642', 62.60000, 205, 0.33000) " +
"('2024-12-19 18:12:47.642', 72.30000, 206, 0.31000) " +
"('2024-12-19 17:12:45.642', 50.30000, 204, 0.25000) " +
"('2024-12-19 17:12:46.642', 62.60000, 205, 0.33000) " +
"('2024-12-19 17:12:47.642', 72.30000, 206, 0.31000) ";
Class.forName("com.taosdata.jdbc.ws.WebSocketDriver");
try (Connection connection = DriverManager.getConnection(jdbcUrl, properties);
Statement stmt = connection.createStatement()) {
stmt.executeUpdate("DROP TOPIC IF EXISTS topic_meters");
stmt.executeUpdate("DROP database IF EXISTS power");
// create database
int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power vgroups 5");
stmt.executeUpdate("use power");
// you can check rowsAffected here
System.out.println("Create database power successfully, rowsAffected: " + rowsAffected);
// create table
rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);");
// you can check rowsAffected here
System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected);
stmt.executeUpdate("CREATE TOPIC topic_meters as SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM meters");
int affectedRows = stmt.executeUpdate(insertQuery);
// you can check affectedRows here
System.out.println("Successfully inserted " + affectedRows + " rows to power.meters.");
stmt.executeUpdate("DROP database IF EXISTS power_sink");
// create database
stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power_sink vgroups 5");
stmt.executeUpdate("use power_sink");
// you can check rowsAffected here
System.out.println("Create database power successfully, rowsAffected: " + rowsAffected);
// create table
stmt.executeUpdate("CREATE STABLE IF NOT EXISTS sink_meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);");
// you can check rowsAffected here
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS sink_normal (ts timestamp, current float, voltage int, phase float);");
// you can check rowsAffected here
} catch (Exception ex) {
// please refer to the JDBC specifications for detailed exceptions info
System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n",
ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "",
ex.getMessage());
// Print stack trace for context in examples. Use logging in production.
throw ex;
}
}
public static void main(String[] args) throws Exception {
prepare();
if (args != null && args.length > 0 && args[0].equals("source")) {
testSource();
} else if (args != null && args.length > 0 && args[0].equals("table")) {
testTableToSink();
} else if (args != null && args.length > 0 && args[0].equals("cdc")) {
testCustomTypeCdc();
}else if (args != null && args.length > 0 && args[0].equals("table-cdc")) {
testCdcTableToSink();
}
}
static SourceSplitSql getTimeSplit() {
// ANCHOR: time_interval
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
// ANCHOR_END: time_interval
return splitSql;
}
static SourceSplitSql getTagSplit() throws Exception {
// ANCHOR: tag_split
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, current, voltage, phase, groupid, location from meters where voltage > 100")
.setTagList(Arrays.asList("groupid >100 and location = 'Shanghai'",
"groupid >50 and groupid < 100 and location = 'Guangzhou'",
"groupid >0 and groupid < 50 and location = 'Beijing'"))
.setSplitType(SplitType.SPLIT_TYPE_TAG);
// ANCHOR_END: tag_split
return splitSql;
}
static SourceSplitSql getTableSqlit() {
// ANCHOR: table_split
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSelect("ts, current, voltage, phase, groupid, location")
.setTableList(Arrays.asList("d1001", "d1002"))
.setOther("order by ts limit 100")
.setSplitType(SplitType.SPLIT_TYPE_TABLE);
// ANCHOR_END: table_split
}
//ANCHOR: source_test
static void testSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
TDengineSource<RowData> source = new TDengineSource<>(connProps, sql, RowData.class);
DataStreamSource<RowData> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + new String(rowData.getBinary(4)));
sb.append("\n");
return sb.toString();
});
resultStream.print();
env.execute("tdengine flink source");
}
//ANCHOR_END: source_test
//ANCHOR: source_custom_type_test
void testCustomTypeSource() throws Exception {
System.out.println("testTDengineSourceByTimeSplit start");
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultSoureDeserialization");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
SourceSplitSql splitSql = new SourceSplitSql();
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
//按照时间分片
.setTimestampSplitInfo(new TimestampSplitInfo(
"2024-12-19 16:12:48.000",
"2024-12-19 19:12:48.000",
"ts",
Duration.ofHours(1),
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
ZoneId.of("Asia/Shanghai")));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
TDengineSource<ResultBean> source = new TDengineSource<>(connProps, splitSql, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
//ANCHOR_END: source_custom_type_test
//ANCHOR: source_batch_test
void testBatchSource() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Class<SourceRecords<RowData>> typeClass = (Class<SourceRecords<RowData>>) (Class<?>) SourceRecords.class;
SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters");
TDengineSource<SourceRecords<RowData>> source = new TDengineSource<>(connProps, sql, typeClass);
DataStreamSource<SourceRecords<RowData>> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<SourceRecords<RowData>, String>) records -> {
StringBuilder sb = new StringBuilder();
Iterator<RowData> iterator = records.iterator();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next();
sb.append("ts: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + new String(row.getBinary(4)));
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
env.execute("flink tdengine source");
}
//ANCHOR_END: source_batch_test
//ANCHOR: cdc_source
void testTDengineCdc() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.ENABLE_AUTO_COMMIT, "true");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class);
DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("tsxx: " + rowData.getTimestamp(0, 0) +
", current: " + rowData.getFloat(1) +
", voltage: " + rowData.getInt(2) +
", phase: " + rowData.getFloat(3) +
", location: " + new String(rowData.getBinary(4)));
sb.append("\n");
totalVoltage.addAndGet(rowData.getInt(2));
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
// The task submitted by Flink UI cannot be cancle and needs to be stopped on the UI page.
jobClient.cancel().get();
}
//ANCHOR_END: cdc_source
//ANCHOR: cdc_batch_source
void testTDengineCdcBatch() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true");
Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class;
TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass);
DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<ConsumerRecords<RowData>, String>) records -> {
Iterator<ConsumerRecord<RowData>> iterator = records.iterator();
StringBuilder sb = new StringBuilder();
while (iterator.hasNext()) {
GenericRowData row = (GenericRowData) iterator.next().value();
sb.append("tsxx: " + row.getTimestamp(0, 0) +
", current: " + row.getFloat(1) +
", voltage: " + row.getInt(2) +
", phase: " + row.getFloat(3) +
", location: " + new String(row.getBinary(4)));
sb.append("\n");
totalVoltage.addAndGet(row.getInt(2));
}
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
//ANCHOR_END: cdc_batch_source
//ANCHOR: cdc_custom_type_test
static void testCustomTypeCdc() throws Exception {
System.out.println("testCustomTypeTDengineCdc start");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(100, AT_LEAST_ONCE);
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(4);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true");
config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<ResultBean> tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() +
", current: " + rowData.getCurrent() +
", voltage: " + rowData.getVoltage() +
", phase: " + rowData.getPhase() +
", groupid: " + rowData.getGroupid() +
", location" + rowData.getLocation() +
", tbname: " + rowData.getTbname());
sb.append("\n");
totalVoltage.addAndGet(rowData.getVoltage());
return sb.toString();
});
resultStream.print();
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(5000L);
jobClient.cancel().get();
}
//ANCHOR_END: cdc_custom_type_test
//ANCHOR: RowDataToSink
static void testRowDataToSink() throws Exception {
Properties connProps = new Properties();
connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
TDengineSource<RowData> source = new TDengineSource<>(connProps, sql, RowData.class);
DataStreamSource<RowData> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
Properties sinkProps = new Properties();
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_source");
sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");
// Arrays.asList The list of target table field names needs to be consistent with the data order
TDengineSink<RowData> sink = new TDengineSink<>(sinkProps,
Arrays.asList("ts", "current", "voltage", "phase", "groupid", "location", "tbname"));
input.sinkTo(sink);
env.execute("flink tdengine source");
}
//ANCHOR_END: RowDataToSink
//ANCHOR: CdcRowDataToSink
static void testCdcToSink() throws Exception {
System.out.println("testTDengineCdcToTdSink start");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(500, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(5000);
Properties config = new Properties();
config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws");
config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041");
config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest");
config.setProperty(TDengineCdcParams.GROUP_ID, "group_1");
config.setProperty(TDengineCdcParams.CONNECT_USER, "root");
config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class);
DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
Properties sinkProps = new Properties();
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true");
sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData");
sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink");
sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters");
sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata");
sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000");
TDengineSink<RowData> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname"));
input.sinkTo(sink);
JobClient jobClient = env.executeAsync("Flink test cdc Example");
Thread.sleep(6000L);
jobClient.cancel().get();
System.out.println("testTDengineCdcToTdSink finish");
}
//ANCHOR_END: CdcRowDataToSink
//ANCHOR: source_table
static void testTableToSink() throws Exception {
System.out.println("testTableToSink start");
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," +
" 'td.jdbc.mode' = 'source'," +
" 'table-name' = 'meters'," +
" 'scan.query' = 'SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'sink'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
}
//ANCHOR_END: source_table
//ANCHOR: cdc_table
static void testCdcTableToSink() throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(5);
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings);
String tdengineSourceTableDDL = "CREATE TABLE `meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'bootstrap.servers' = 'localhost:6041'," +
" 'td.jdbc.mode' = 'cdc'," +
" 'group.id' = 'group_22'," +
" 'auto.offset.reset' = 'earliest'," +
" 'enable.auto.commit' = 'false'," +
" 'topic' = 'topic_meters'" +
")";
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
" ts TIMESTAMP," +
" `current` FLOAT," +
" voltage INT," +
" phase FLOAT," +
" location VARBINARY," +
" groupid INT," +
" tbname VARBINARY" +
") WITH (" +
" 'connector' = 'tdengine-connector'," +
" 'td.jdbc.mode' = 'cdc'," +
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," +
" 'sink.db.name' = 'power_sink'," +
" 'sink.supertable.name' = 'sink_meters'" +
")";
tableEnv.executeSql(tdengineSourceTableDDL);
tableEnv.executeSql(tdengineSinkTableDDL);
TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`");
Thread.sleep(5000L);
tableResult.getJobClient().get().cancel().get();
}
//ANCHOR_END: cdc_table
}

View File

@ -22,7 +22,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
<!-- ANCHOR_END: dep-->

View File

@ -89,7 +89,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.5.0</version>
<version>3.5.1</version>
</dependency>
```

View File

@ -0,0 +1,371 @@
---
sidebar_label: Flink
title: TDengine Flink Connector
---
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时Flink 拥有丰富的连接器与各类工具可对接众多不同类型的数据源实现数据的读取与写入。在数据处理的过程中Flink 还提供了一系列可靠的容错机制,有力保障任务即便遭遇意外状况,依然能稳定、持续运行。
借助 TDengine 的 Flink 连接器Apache Flink 得以与 TDengine 数据库无缝对接,一方面能够将经过复杂运算和深度分析后所得到的结果精准存入 TDengine 数据库,实现数据的高效存储与管理;另一方面,也可以从 TDengine 数据库中快速、稳定地读取海量数据,并在此基础上进行全面、深入的分析处理,充分挖掘数据的潜在价值,为企业的决策制定提供有力的数据支持和科学依据,极大地提升数据处理的效率和质量,增强企业在数字化时代的竞争力和创新能力。
## 前置条件
准备以下环境:
- TDengine 集群已部署并正常运行(企业及社区版均可)
- taosAdapter 能够正常运行。详细参考 [taosAdapter 使用手册](../../../reference/components/taosadapter)
- Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 [官方文档](https://flink.apache.org/)
# 支持的平台
Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。
## 版本历史
| Flink Connector 版本 | 主要变化 | TDengine 版本 |
| ------------------| ------------------------------------ | ---------------- |
| 2.0.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据<br/> 2. 支持 CDC 订阅 TDengine 数据库中的数据<br/> 3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 |
| 1.0.0 | 支持 Sink 功能,将来着其他数据源的数据写入到 TDengine| 3.3.2.0 及以上版本|
## 异常和错误码
在任务执行失败后,查看 Flink 任务执行日志确认失败原因
具体的错误码请参考:
| Error Code | Description | Suggested Actions |
| ---------------- |------------------------------------------------------- | -------------------- |
| 0xa000 |connection param error |连接器参数错误。
| 0xa001 |the groupid parameter of CDC is incorrect |CDC 的 groupid 参数错误。|
| 0xa002 |wrong topic parameter for CDC |CDC 的 topic 参数错误。|
| 0xa010 |database name configuration error |数据库名配置错误。|
| 0xa011 |table name configuration error |表名配置错误。|
| 0xa012 |no data was obtained from the data source |从数据源中获取数据失败。|
| 0xa013 |value.deserializer parameter not set |未设置序列化方式。|
| 0xa014 |list of column names for target table not set |未设置目标表的列名列表。|
| 0x2301 |connection already closed |连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。|
| 0x2302 |this operation is NOT supported currently! |当前使用接口不支持,可以更换其他连接方式。|
| 0x2303 |invalid variables |参数不合法,请检查相应接口规范,调整参数类型及大小。|
| 0x2304 |statement is closed |statement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。|
| 0x2305 |resultSet is closed |resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。|
| 0x230d |parameter index out of range |参数越界,请检查参数的合理范围。|
| 0x230e |connection already closed |连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。|
| 0x230f |unknown sql type in TDengine |请检查 TDengine 支持的 Data Type 类型。|
| 0x2315 |unknown taos type in TDengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。|
| 0x2319 |user is required |创建连接时缺少用户名信息。|
| 0x231a |password is required |创建连接时缺少密码信息。|
| 0x231d |can't create connection with server within |通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。|
| 0x231e |failed to complete the task within the specified time |通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。|
| 0x2352 |Unsupported encoding |本地连接下指定了不支持的字符编码集。|
| 0x2353 |internal error of database, please see taoslog for more details |本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。|
| 0x2354 |connection is NULL |本地连接执行命令时Connection 已经关闭。请检查与 TDengine 的连接情况。|
| 0x2355 |result set is NULL |本地连接获取结果集,结果集异常,请检查连接情况,并重试。|
| 0x2356 |invalid num of fields |本地连接获取结果集的 meta 信息不匹配。|
| 0x2357 |empty sql string |填写正确的 SQL 进行执行。|
| 0x2371 |consumer properties must not be null! |创建订阅时参数为空,请填写正确的参数。|
| 0x2375 |topic reference has been destroyed |创建数据订阅过程中topic 引用被释放。请检查与 TDengine 的连接情况。|
| 0x2376 |failed to set consumer topic, topic name is empty |创建数据订阅过程中,订阅 topic 名称为空。请检查指定的 topic 名称是否填写正确。|
| 0x2377 |consumer reference has been destroyed |订阅数据传输通道已经关闭,请检查与 TDengine 的连接情况。|
| 0x2378 |consumer create error |创建数据订阅失败,请根据错误信息检查 taos log 进行问题定位。|
| 0x237a |vGroup not found in result set VGroup |没有分配给当前 consumer由于 Rebalance 机制导致 Consumer 与 VGroup 不是绑定的关系。|
## 数据类型映射
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowData Type 对应类型转换如下:
| TDengine DataType | Flink RowDataType |
| ----------------- | ------------------ |
| TIMESTAMP | TimestampData |
| INT | Integer |
| BIGINT | Long |
| FLOAT | Float |
| DOUBLE | Double |
| SMALLINT | Short |
| TINYINT | Byte |
| BOOL | Boolean |
| BINARY | byte[] |
| NCHAR | StringData |
| JSON | StringData |
| VARBINARY | byte[] |
| GEOMETRY | byte[] |
## 使用说明
### Flink 语义选择说明
采用 At-Least-Once至少一次语义原因
- TDengine 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。
- 由于 TDengine 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。
- 采用 At-Least-Once至少一次确保达到较高的数据处理的性能和较低的数据延时设置方式如下
使用方式:
```java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
```
如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。
```xml
<dependency>
<groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId>
<version>2.0.0</version>
</dependency>
```
### 连接参数
建立连接的参数有 URL 和 Properties。
URL 规范格式为:
`jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]`
参数说明:
- user登录 TDengine 用户名,默认值 'root'。
- password用户登录密码默认值 'taosdata'。
- database_name: 数据库名称。
- timezone: 时区设置。
- httpConnectTimeout: 连接超时时间,单位 ms 默认值为 60000。
- messageWaitTimeout: 消息超时时间,单位 ms 默认值为 60000。
- useSSL: 连接中是否使用 SSL。
### Source
Source 拉取 TDengine 数据库中的数据,并将获取到的数据转换为 Flink 内部可处理的格式和类型,并以并行的方式进行读取和分发,为后续的数据处理提供高效的输入。
通过设置数据源的并行度,实现多个线程并行地从数据源中读取数据,提高数据读取的效率和吞吐量,充分利用集群资源进行大规模数据处理能力。
Properties 中配置参数如下:
- TDengineConfigParams.PROPERTY_KEY_USER登录 TDengine 用户名,默认值 'root'。
- TDengineConfigParams.PROPERTY_KEY_PASSWORD用户登录密码默认值 'taosdata'。
- TDengineConfigParams.VALUE_DESERIALIZER下游算子接收结果集反序列化方法, 如果接收结果集类型是 `Flink``RowData`,仅需要设置为 `RowData`即可。也可继承 `TDengineRecordDeserialization` 并实现 `convert``getProducedType` 方法,根据 `SQL``ResultSet` 自定义反序列化方式。
- TDengineConfigParams.TD_BATCH_MODE此参数用于批量将数据推送给下游算子如果设置为 True创建 `TDengineSource` 对象时需要指定数据类型为 `SourceRecords` 类型的泛型形式。
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用false: 不启用。默认为 false。
#### 按时间分片
用户可以对查询的 SQL 按照时间拆分为多个子任务,输入:开始时间,结束时间,拆分间隔,时间字段名称,系统会按照设置的间隔(时间左闭右开)进行拆分并行获取数据。
```java
{{#include docs/examples/flink/Main.java:time_interval}}
```
#### 按超级表 TAG 分片
用户可以按照超级表的 TAG 字段将查询的 SQL 拆分为多个查询条件,系统会以一个查询条件对应一个子任务的方式对其进行拆分,进而并行获取数据。
```java
{{#include docs/examples/flink/Main.java:tag_split}}
```
#### 按表名分片
支持输入多个相同表结构的超级表或普通表进行分片,系统会按照一个表一个任务的方式进行拆分,进而并行获取数据。
```java
{{#include docs/examples/flink/Main.java:table_split}}
```
#### 使用 Source 连接器
查询结果为 RowData 数据类型示例:
<details>
<summary>RowData Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_test}}
```
</details>
批量查询结果示例:
<details>
<summary>Batch Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_batch_test}}
```
</details>
查询结果为自定义数据类型示例:
<details>
<summary>Custom Type Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_custom_type_test}}
```
</details>
- ResultBean 自定义的一个内部类,用于定义 Source 查询结果的数据类型。
- ResultSoureDeserialization 是自定义的一个内部类,通过继承 `TDengineRecordDeserialization` 并实现 `convert``getProducedType` 方法。
### CDC 数据订阅
Flink CDC 主要用于提供数据订阅功能,能实时监控 `TDengine` 数据库的数据变化,并将这些变更以数据流形式传输到 `Flink` 中进行处理,同时确保数据的一致性和完整性。
Properties 中配置参数如下:
- TDengineCdcParams.BOOTSTRAP_SERVERSTDengine 服务端所在的`ip:port`,如果使用 `WebSocket` 连接,则为 taosAdapter 所在的`ip:port`。
- TDengineCdcParams.CONNECT_USER登录 TDengine 用户名,默认值 'root'。
- TDengineCdcParams.CONNECT_PASS用户登录密码默认值 'taosdata'。
- TDengineCdcParams.POLL_INTERVAL_MS拉取数据间隔, 默认 500ms。
- TDengineCdcParams.VALUE_DESERIALIZER结果集反序列化方法如果接收结果集类型是 `Flink``RowData`,仅需要设置为 `RowData`即可。可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean实现反序列化。
- TDengineCdcParams.TMQ_BATCH_MODE此参数用于批量将数据推送给下游算子如果设置为 True创建 `TDengineCdcSource` 对象时需要指定数据类型为 `ConsumerRecords` 类型的泛型形式。
- TDengineCdcParams.GROUP_ID消费组 ID同一消费组共享消费进度。最大长度192。
- TDengineCdcParams.AUTO_OFFSET_RESET 消费组订阅的初始位置 `earliest` 从头开始订阅, `latest` 仅从最新数据开始订阅, 默认 `latest`)。
- TDengineCdcParams.ENABLE_AUTO_COMMIT是否启用消费位点自动提交true: 自动提交false依赖 `checkpoint` 时间来提交, 默认 false。
> **注意**自动提交模式reader获取完成数据后自动提交不管下游算子是否正确的处理了数据存在数据丢失的风险主要用于为了追求高效的无状态算子场景或是数据一致性要求不高的场景。
- TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS消费记录自动提交消费位点时间间隔单位为毫秒。默认值为 5000, 此参数在 `ENABLE_AUTO_COMMIT` 为 true 生效。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT是否启用自动重连。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS自动重连重试间隔单位毫秒默认值 2000。仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT自动重连重试次数默认值 3仅在 `PROPERTY_KEY_ENABLE_AUTO_RECONNECT` 为 true 时生效。
- TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS`consumer` 心跳丢失后超时时间,超时后会触发 `rebalance` 逻辑,成功后该 `consumer` 会被删除从3.3.3.0版本开始支持), 默认值为 12000取值范围 [6000 1800000]。
- TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS`consumer poll` 拉取数据间隔的最长时间,超过该时间,会认为该 `consumer` 离线,触发 `rebalance` 逻辑,成功后该 `consumer` 会被删除。 默认值为 300000[1000INT32_MAX]。
#### 使用 CDC 连接器
CDC 连接器会根据用户设置的并行度进行创建 consumer因此用户根据资源情况合理设置并行度。
订阅结果为 RowData 数据类型示例:
<details>
<summary>CDC Source</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_source}}
```
</details>
将订阅结果批量下发到算子的示例:
<details>
<summary>CDC Batch Source</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_batch_source}}
```
</details>
订阅结果为自定义数据类型示例:
<details>
<summary>CDC Custom Type</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_custom_type_test}}
```
</details>
- ResultBean 是自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据 `TDengineCdcParams.VALUE_DESERIALIZER` 属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。
### Sink
Sink 的核心功能在于高效且精准地将经过 `Flink` 处理的、源自不同数据源或算子的数据写入 `TDengine`。在这一过程中,`TDengine` 所具备的高效写入机制发挥了至关重要的作用,有力保障了数据的快速和稳定存储。
Properties 中配置参数如下:
- TDengineConfigParams.PROPERTY_KEY_USER登录 `TDengine` 用户名,默认值 'root'。
- TDengineConfigParams.PROPERTY_KEY_PASSWORD用户登录密码默认值 'taosdata'。
- TDengineConfigParams.PROPERTY_KEY_DBNAME写入的数据库名称。
- TDengineConfigParams.TD_SUPERTABLE_NAME写入的超级表名称。接收的数据必须有 tbname 字段,确定写入那张子表。
- TDengineConfigParams.TD_TABLE_NAME写入子表或普通表的表名此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可。
- TDengineConfigParams.VALUE_DESERIALIZER接收结果集反序列化方法, 如果接收结果集类型是 `Flink``RowData`,仅需要设置为 `RowData`即可。也可继承 `TDengineSinkRecordSerializer` 并实现 `serialize` 方法,根据 接收的数据类型自定义反序列化方式。
- TDengineConfigParams.TD_BATCH_SIZE设置一次写入 `TDengine` 数据库的批大小 | 当到达批的数量后进行写入或是一个checkpoint的时间也会触发写入数据库。
- TDengineConfigParams.TD_BATCH_MODE接收批量数据当设置为 True 时,如果数据来源是 `TDengine Source`,则使用 `SourceRecords` 泛型类型来创建 `TDengineSink` 对象;若来源是 `TDengine CDC`,则使用 `ConsumerRecords` 泛型来创建 `TDengineSink` 对象。
- TDengineConfigParams.TD_SOURCE_TYPE设置数据来源。 当数据来源是 `TDengine Source` 是设置为 'tdengine_source', 当来源是 `TDengine CDC` 设置为 'tdengine_cdc'。当配置 `TD_BATCH_MODE` 为 True 生效。
- TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms 默认值为 60000。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION: 传输过程是否启用压缩。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT: 是否启用自动重连。true: 启用false: 不启用。默认为 false。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS: 自动重连重试间隔,单位毫秒,默认值 2000。仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT: 自动重连重试次数,默认值 3仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。
- TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION: 关闭 SSL 证书验证 。true: 启用false: 不启用。默认为 false。
使用示例:
将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
<details>
<summary>Sink RowData</summary>
```java
{{#include docs/examples/flink/Main.java:RowDataToSink}}
```
</details>
使用示例:
订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
<details>
<summary>Cdc Sink</summary>
```java
{{#include docs/examples/flink/Main.java:CdcRowDataToSink}}
```
</details>
### Table SQL
使用 Table SQL 的方式从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据后, 再进行自定义的算子操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。
#### Table Source 连接器
参数配置说明:
| 参数名称 | 类型 | 参数说明 |
| ----------------------- | :-----: | ------------ |
| connector | string | 连接器标识,设置 `tdengine-connector` 。|
| td.jdbc.url| string | 连接的 url 。|
| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `sink`。|
| table.name| string| 原表或目标表名称。|
| scan.query| string| 获取数据的 SQL 语句。|
| sink.db.name|string| 目标数据库名称。|
| sink.supertable.name|string |写入的超级表名称。|
| sink.batch.size | integer | 写入的批大小。|
| sink.table.name|string|写入的普通表或子表名称。|
使用示例:
将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
<details>
<summary>Table Source</summary>
```java
{{#include docs/examples/flink/Main.java:source_table}}
```
</details>
#### Table CDC 连接器
参数配置说明:
| 参数名称 | 类型 | 参数说明 |
| ----------------------- | :-----: | ------------ |
| connector | string | 连接器标识,设置 `tdengine-connector`。|
| user| string | 用户名, 默认 root。|
| password | string | 密码, 默认taosdata。|
| bootstrap.servers| string | 服务器地址。|
| topic | string | 订阅主题。||
| td.jdbc.mode | strng | 连接器类型, cdc, sink。|
| group.id| string| 消费组 ID同一消费组共享消费进度。 |
| auto.offset.reset| string| 消费组订阅的初始位置。<br/>`earliest`: 从头开始订阅 <br/> `latest`: 仅从最新数据开始订阅。<br/> 默认 `latest`。|
| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。|
| sink.db.name|string| 目标数据库名称。|
| sink.supertable.name|string |写入的超级表名称。|
| sink.batch.size | integer | 写入的批大小。|
| sink.table.name|string|写入的普通表或子表名称。|
使用示例:
订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
<details>
<summary>Table CDC</summary>
```java
{{#include docs/examples/flink/Main.java:cdc_table}}
```
</details>

View File

@ -256,7 +256,7 @@ charset 的有效值是 UTF-8。
|slowLogMaxLen |3.3.3.0 后|支持动态修改 立即生效 |慢查询日志最大长度,取值范围 1-16384默认值 4096|
|slowLogScope |3.3.3.0 后|支持动态修改 立即生效 |慢查询记录类型,取值范围 ALL/QUERY/INSERT/OTHERS/NONE默认值 QUERY|
|slowLogExceptDb |3.3.3.0 后|支持动态修改 立即生效 |指定的数据库不上报慢查询,仅支持配置换一个数据库|
|debugFlag | |支持动态修改 立即生效 |运行日志开关131输出错误和警告日志135输出错误、警告和调试日志143输出错误、警告、调试和跟踪日志默认值 131 或 135 (取决于不同模块)|
|debugFlag | |支持动态修改 立即生效 |运行日志开关131输出错误和警告日志135输出错误、警告和调试日志143输出错误、警告、调试和跟踪日志默认值 131 或 135 (取决于不同模块),该参数的设置会影响所有模块的开关,后设置的参数起效|
|tmrDebugFlag | |支持动态修改 立即生效 |定时器模块的日志开关,取值范围同上|
|uDebugFlag | |支持动态修改 立即生效 |共用功能模块的日志开关,取值范围同上|
|rpcDebugFlag | |支持动态修改 立即生效 |rpc 模块的日志开关,取值范围同上|

View File

@ -33,6 +33,7 @@ TDengine 的 JDBC 驱动实现尽可能与关系型数据库驱动保持一致
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
| ------------------| ---------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- |
| 3.5.1 | 解决了数据订阅获取时间戳对象类型问题 | - |
| 3.5.0 | 1. 优化了 WebSocket 连接参数绑定性能,支持参数绑定查询使用二进制数据 <br/> 2. 优化了 WebSocket 连接在小查询上的性能 <br/> 3. WebSocket 连接上支持设置时区和应用信息 | 3.3.5.0 及更高版本 |
| 3.4.0 | 1. 使用 jackson 库替换 fastjson 库 <br/> 2. WebSocket 采用独立协议标识 <br/> 3. 优化后台拉取线程使用,避免用户误用导致超时 | - |
| 3.3.4 | 解决了 getInt 在数据类型为 float 报错 | - |

View File

@ -302,3 +302,13 @@ TDinsight插件中展示的数据是通过taosKeeper和taosAdapter服务收集
问题影响:服务器重启后,原有数据库丢失(注:并非真正丢失,只是原有的数据磁盘未挂载,暂时看不到)且集群 ID 发生变化,导致无法访问原有数据库。对于企业版用户,如果已针对集群 ID 进行授权,还会发现集群服务器的机器码未变,但原有的授权已失效。如果未针对该问题进行监控或者未及时发现并进行处理,则用户不会注意到原有数据库已经丢失,从而造成损失,增加运维成本。
问题解决:应在 fstab 文件中配置 dataDir 目录的自动挂载,确保 dataDir 始终指向预期的挂载点和目录,此时,再重启服务器,会找回原有的数据库和集群。在后续的版本中,我们将开发一个功能,使 taosd 在检测到启动前后 dataDir 发生变化时,在启动阶段退出,同时提供相应的错误提示。
### 33 Windows 平台运行 TDengine 出现丢失 MVCP1400.DLL 解决方法?
1. 重新安装 Microsoft Visual C++ Redistributable由于 msvcp140.dll 是 Microsoft Visual C++ Redistributable 的一部分,重新安装这个包通常可以解决大部分问题。可以从 Microsoft 官方网站下载相应的版本进行安装‌
2. 手动上网下载并替换 msvcp140.dll 文件‌:可以从可靠的源下载 msvcp140.dll 文件并将其复制到系统的相应目录下。确保下载的文件与您的系统架构32位或64位相匹配并确保来源的安全性
### 34 超级表带 TAG 过滤查子查数据与直接查子表哪个块?
直接查子表更快。超级表带 TAG 过滤查询子查数据是为满足查询方便性,同时可对多个子表中数据进行过滤,如果目的是追求性能并已明确查询子表,直接从子表查性能更高
### 35 如何查看数据压缩率指标?
TDengine 目前只提供以表为统计单位的压缩率,数据库及整体还未提供,查看命令是在客户端 taos-CLI 中执行 `SHOW TABLE DISTRIBUTED table_name;` 命令table_name 为要查看压缩率的表,可以为超级表、普通表及子表,详细可 [查看此处](https://docs.taosdata.com/reference/taos-sql/show/#show-table-distributed)

View File

@ -24,6 +24,10 @@ TDengine 3.x 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3";
## 3.3.5.0
<Release type="tdengine" version="3.3.5.0" />
## 3.3.4.8
<Release type="tdengine" version="3.3.4.8" />

View File

@ -0,0 +1,84 @@
---
title: 3.3.5.0 版本说明
sidebar_label: 3.3.5.0
description: 3.3.5.0 版本说明
---
## 特性
1. 特性MQTT 稳定性和性能提升
2. 特性taosX 增量备份与恢复
3. 特性JDBC WebSocket 连接支持 STMT2 接口
4. 特性Rust 连接器支持 STMT2 接口
5. 特性taos-CLI 中在错误提示中增加错误码
6. 特性Python 连接器对接 SuperSet
7. 特性Explorer 可配置 Grafana Dashboard
8. 特性taosX-agent 支持配置内存缓存队列长度
## 优化
1. 优化:调整 telemerty 的上报机制
2. 优化:支持通过 SQL 统计指定 DB 的磁盘空间
3. 优化:在服务端增加查询内存管控
4. 优化INTERVAL 子句允许使用 AUTO 关键字来指定自动计算窗口偏移量
5. 优化:减少在多级存储迁移数据时对数据写入性能的影响
6. 优化Grafana 插件 UI 转为 React 以完整适配 11.3.0 版本
7. 优化taosAdapter websocket 接口优化
8. 优化taosX 添加健康状态
9. 优化taosX 支持可配置的异常数据处理
10. 优化:支持为客户端连接设置选项,包括时区、字符集、用户 IP、用户名称
11. 优化taosdump 支持查询超时或连接端开时自动重连
12. 优化:允许为已经订阅的 tag 列创建索引
13. 优化taosX 支持密码中包含特殊字符
14. 优化:提升开启 Last 缓存时的数据写入性能
15. 优化COMPACT 命令支持自动执行、并发度设置及执行进度观测
16. 优化:支持通过 SQL 语句修改全局配置参数并持久化
17. 优化:更新各数据类型的压缩算法默认值,提高大部分场景的压缩比
18. 优化taosBenchmark 在 Mac/Windows 上 --nodrop 参数行为修复
19. 优化:禁止 DB 内 Compact 和副本变更相关操作同时进行(企业版)
20. 优化taosdump 支持复合主键表导出
21. 优化:支持在 show queries 和 show connections 语句的返回结果中显示用户 IP 和用户名称
22. 优化JDBC 支持多表批量写入
23. 优化:支持对多级存储中的 dataDir 参数进行动态调整。
24. 优化taosX 数据库文件默认使用 data_dir
25. 优化:强制要求用户设置强密码,密码长度必须为 8 到 16 位,并且至少包含大写字母、小写字母、数字、特殊字符中的三类
26. 优化:提高客户端获取新 Leader 的速度
27. 优化OPC 点位正则匹配支持 “非”
## 修复
1. 修复:流计算在高负载场景下更新 checkpoint 时可能出现死锁
2. 修复TMQ 同步目标端报错无法恢复的问题
3. 修复WAL 中的某个数据块校验失败时 taosd 无法启动的问题
4. 修复taosBenchmark 多副本下节点宕机写入失败
5. 修复:日志文件切换频繁时日志文件可能丢失的问题
6. 修复:窗口内数据更新导致流计算停止的问题
7. 修复libtaosws.so 读取数据时如果连接中端,错误码设置有误
8. 修复OPC 数据点位以 @ 开头时解析错误
9. 修复taosBenchmark 解决云服务下show vgroups 权限问题
10. 修复taosX 迁移支持列压缩的超宽超级表时报语法错误
11. 修复Vnode 占用内存估算错误的问题
12. 修复:对 varchar 类型的常量字符串进行 union all 查询时失败的问题
13. 修复:执行事务期间切换 leader 会导致 mnode 死锁
14. 修复ws_stmt_get_tag_fields 返回地址非法
15. 修复UNION 语句中存在包含多个 NULL 的子查询时执行报错
16. 修复:流计算的暂停操作可能失败的问题
17. 修复:通过 SQL 语句将数据写入一个表名长度为192个字符的子表时如果表名被反引号`)包围,可能会导致错误
18. 修复:在对不同数据库的超级表进行联合查询时,如果两个数据库各自仅包含一个虚拟节点,查询将返回错误。
19. 修复:磁盘空间不足时 taosX panic 导致任务无法恢复
20. 修复:在向超级表中写入数据的过程中,如果同时使用了绑定和非绑定的方式,将会引发异常
21. 修复:特殊情况下 taosX agent 连接时 metrics 不存在导致panic
22. 修复为字符长度较大的标签数据创建索引时taosd 可能 crash
23. 修复:当函数 first、last、last_row 和 char 的输入参数超过 127 时taosd 可能会崩溃的问题 https://github.com/taosdata/TDengine/issues/29241
24. 修复Limit 语句的结果集条数超过一个数据块时,返回条数和预期不符合
25. 修复:集群间数据同步时,如果删除掉目标集群任务,源集群可能 OOM
26. 修复:元数据读写锁设置错误导致有极小几率阻塞写入的问题
27. 修复:在 Windows 平台上使用 INSERT INTO 语句导入 CSV 文件时,如果文件末尾没有换行符,可能会导致无限循环读取的问题
28. 修复:在源表的标签更新后,流计算未能识别并应用新的标签值
29. 修复:调整 Kafka 订阅参数提升 Kakfa 数据写入性能和稳定性
30. 修复SQL 查询中同时包含 is null 和无效的 in 筛选条件时,查询结果有误 https://github.com/taosdata/TDengine/issues/29067
31. 修复SQL 查询中同时包含 in 和 between 筛选条件时,查询结果有误 https://github.com/taosdata/TDengine/issues/28989
32. 修复timestamp 类型和数值类型进行乘除运算时结果有误 https://github.com/taosdata/TDengine/issues/28339
33. 修复IN 条件中的数据类型转换错误导致查询结果不正确 https://github.com/taosdata/TDengine/issues/29047 https://github.com/taosdata/TDengine/issues/28902
34. 修复:常量条件和 OR 运算符结合时筛选结果错误 https://github.com/taosdata/TDengine/issues/28904
35. 修复:对时间戳类型进行减法运算时未考虑负值的情况 https://github.com/taosdata/TDengine/issues/28906
36. 修复GROUP BY tag 时某些标签值显示错误的问题
37. 修复:旧版本 GCC Bug 导致编译失败

View File

@ -4,6 +4,7 @@ sidebar_label: 版本说明
description: 各版本版本说明
---
[3.3.5.0](./3.3.5.0)
[3.3.4.8](./3.3.4.8)
[3.3.4.3](./3.3.4.3)
[3.3.3.0](./3.3.3.0)

View File

@ -1,4 +1,8 @@
#!/bin/bash
# Run cleanup function on exit
trap cleanup EXIT
# define default timezone
DEFAULT_TIMEZONE="Asia/Shanghai"
@ -64,6 +68,9 @@ PROCESS_EXPORTER_BINARY="/usr/local/bin/process-exporter"
# Define fstab input
FSTAB_LINE="share-server.platform.tdengine.dev:/mnt/share_server /mnt/share_server nfs rw,sync,_netdev 0 0"
# Results need to be stored when source
SOURCE_RESULTS=""
# ANSI color codes
GREEN='\033[0;32m' # Green color
RED='\033[0;31m' # Red color
@ -146,9 +153,14 @@ help() {
echo " config_coredump - Configure core dump settings"
echo " disable_service - Disable specified services"
echo " install_python - Install Python and pip"
echo " install_pyenv - Install Pyenv"
echo " install_python_via_pyenv - Install Python via pyenv"
echo " install_java - Install Java"
echo " install_maven - Install Maven"
echo " install_java_via_sdkman - Install Java via sdkman"
echo " install_maven_via_sdkman - Install Maven via sdkman"
echo " deploy_go - Deploy Go environment"
echo " install_gvm - Install GVM"
echo " install_go_via_gvm - Install Go via GVM"
echo " deploy_rust - Deploy Rust environment"
echo " install_node - Install Node via package manager or binary"
echo " install_node_via_nvm - Install Node via NVM"
@ -786,7 +798,7 @@ update_redhat_gcc() {
update_redhat_tmux() {
echo "Downloading the latest version of tmux..."
cd /usr/local/src || exit
latest_tmux_version=$(curl -s https://api.github.com/repos/tmux/tmux/releases/latest | grep -Po '"tag_name": "\K.*?(?=")')
latest_tmux_version=$(curl --retry 10 --retry-delay 5 --retry-max-time 120 -s https://api.github.com/repos/tmux/tmux/releases/latest | grep -Po '"tag_name": "\K.*?(?=")')
wget https://github.com/tmux/tmux/releases/download/"${latest_tmux_version}"/tmux-"${latest_tmux_version}".tar.gz
echo "Extracting tmux ${latest_tmux_version}..."
@ -838,6 +850,7 @@ deploy_tmux() {
# }
# Install Java
# shellcheck disable=SC2120
install_java() {
echo -e "${YELLOW}Installing Java...${NO_COLOR}"
# Specify the major JDK version to search for; default is set to 17 if not specified
@ -917,6 +930,7 @@ install_java() {
INSTALLED_VERSION=$("$JAVA_HOME"/bin/java --version 2>&1)
if echo "$INSTALLED_VERSION" | grep -q "openjdk $DEFAULT_JDK_VERSION"; then
echo -e "${GREEN}Java installed successfully.${NO_COLOR}"
SOURCE_RESULTS+="source /root/.bashrc # For openjdk\n"
else
echo -e "${YELLOW}Java version not match.${NO_COLOR}"
exit 1
@ -925,35 +939,118 @@ install_java() {
# Install sdkman
install_sdkman() {
install_package zip unzip
echo -e "${YELLOW}Installing SDKMAN...${NO_COLOR}"
if [ -d "$HOME/.sdkman" ]; then
echo -e "${GREEN}SDKMAN is already installed.${NO_COLOR}"
else
echo -e "${YELLOW}Installing SDKMAN...${NO_COLOR}"
curl -s "https://get.sdkman.io" | bash
install_package zip unzip
curl --retry 10 --retry-delay 5 --retry-max-time 120 -s "https://get.sdkman.io" | bash
fi
}
# Install gvm
install_gvm() {
echo -e "${YELLOW}Installing GVM...${NO_COLOR}"
if [ -d "$HOME/.gvm" ]; then
echo -e "${GREEN}GVM is already installed.${NO_COLOR}"
else
install_package bison gcc make
bash < <(curl --retry 10 --retry-delay 5 --retry-max-time 120 -s -S -L https://raw.githubusercontent.com/moovweb/gvm/master/binscripts/gvm-installer)
source $HOME/.gvm/scripts/gvm
gvm version
check_status "Failed to install GVM" "GVM installed successfully." $?
add_config_if_not_exist "export GO111MODULE=on" "$BASH_RC"
add_config_if_not_exist "export GOPROXY=https://goproxy.cn,direct" "$BASH_RC"
add_config_if_not_exist "export GO_BINARY_BASE_URL=https://mirrors.aliyun.com/golang/" "$BASH_RC"
add_config_if_not_exist "export GOROOT_BOOTSTRAP=$GOROOT" "$BASH_RC"
fi
SOURCE_RESULTS+="source $HOME/.gvm/scripts/gvm # For gvm\n"
}
# enable pyenv
enable_pyenv() {
export PATH="$HOME/.pyenv/bin:$PATH"
eval "$(pyenv init --path)"
eval "$(pyenv init -)"
}
# Install pyenv
install_pyenv() {
echo -e "${YELLOW}Installing Pyenv...${NO_COLOR}"
if [ -d "$HOME/.pyenv" ]; then
echo -e "${GREEN}Pyenv is already installed.${NO_COLOR}"
else
curl -L https://gitee.com/xinghuipeng/pyenv-installer/raw/master/bin/pyenv-installer | bash
enable_pyenv
add_config_if_not_exist "export PATH=\"\$HOME/.pyenv/bin:\$PATH\"" "$BASH_RC"
add_config_if_not_exist "eval \"\$(pyenv init --path)\"" "$BASH_RC"
add_config_if_not_exist "eval \"\$(pyenv init -)\"" "$BASH_RC"
pyenv --version
check_status "Failed to install Pyenv" "Pyenv installed successfully." $?
fi
SOURCE_RESULTS+="source $BASH_RC For: pyenv/python\n"
}
# Install python via pyenv
install_python_via_pyenv() {
echo -e "${YELLOW}Installing Python via Pyenv...${NO_COLOR}"
if [ -f /etc/debian_version ]; then
install_package gcc make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev
elif [ -f /etc/redhat-release ]; then
install_package gcc zlib zlib-devel libffi libffi-devel readline-devel openssl-devel openssl11 openssl11-devel
else
echo "Unsupported Linux distribution."
exit 1
fi
if [ -n "$1" ]; then
DEFAULT_PYTHON_VERSION="$1"
else
DEFAULT_PYTHON_VERSION="3.10.12"
fi
install_pyenv
enable_pyenv
pyenv install "$DEFAULT_PYTHON_VERSION"
pyenv global "$DEFAULT_PYTHON_VERSION"
python --version
check_status "Failed to install Python" "Python installed successfully." $?
}
# Install Maven
# shellcheck disable=SC2120
install_maven() {
install_maven_via_sdkman() {
echo -e "${YELLOW}Installing maven...${NO_COLOR}"
if [ -n "$1" ]; then
DEFAULT_MVN_VERSION="$1"
install_sdkman
if [ -f "$HOME/.sdkman/bin/sdkman-init.sh" ]; then
source "$HOME/.sdkman/bin/sdkman-init.sh"
fi
[[ -s "$HOME/.sdkman/bin/sdkman-init.sh" ]] && source "$HOME/.sdkman/bin/sdkman-init.sh"
# 3.2.5
sdk install maven "$DEFAULT_MVN_VERSION"
yes | sdk install maven "$DEFAULT_MVN_VERSION"
else
install_package "maven"
fi
[[ -s "$HOME/.sdkman/bin/sdkman-init.sh" ]] && source "$HOME/.sdkman/bin/sdkman-init.sh"
mvn -version
check_status "Failed to install maven" "Maven installed successfully." $?
}
install_java_via_sdkman() {
echo -e "${YELLOW}Installing java...${NO_COLOR}"
if [ -n "$1" ]; then
DEFAULT_JDK_VERSION="$1"
else
DEFAULT_JDK_VERSION="17"
fi
install_sdkman
[[ -s "$HOME/.sdkman/bin/sdkman-init.sh" ]] && source "$HOME/.sdkman/bin/sdkman-init.sh"
yes | sdk install java "$DEFAULT_JDK_VERSION-open"
[[ -s "$HOME/.sdkman/bin/sdkman-init.sh" ]] && source "$HOME/.sdkman/bin/sdkman-init.sh"
java -version
check_status "Failed to install java" "Java installed successfully." $?
SOURCE_RESULTS+="source $HOME/.sdkman/bin/sdkman-init.sh # For sdkman/java/maven\n"
}
# Install Go
deploy_go() {
# Define the installation location for Go
@ -987,6 +1084,31 @@ deploy_go() {
# Apply the environment variables
$GO_INSTALL_DIR/bin/go version
check_status "Failed to install GO" "Install GO successfully" $?
SOURCE_RESULTS+="source $BASH_RC # For golang\n"
}
# Install Go via gvm
install_go_via_gvm() {
echo -e "${YELLOW}Installing Go...${NO_COLOR}"
if [ -n "$1" ]; then
DEFAULT_GO_VERSION="$1"
else
DEFAULT_GO_VERSION="1.23.0"
fi
install_gvm
source $HOME/.gvm/scripts/gvm
export GO111MODULE=on
export GOPROXY=https://goproxy.cn,direct
export GO_BINARY_BASE_URL=https://mirrors.aliyun.com/golang/
export GOROOT_BOOTSTRAP=$GOROOT
gvm install go"$DEFAULT_GO_VERSION" -B
gvm use go"$DEFAULT_GO_VERSION"
gvm use go"$DEFAULT_GO_VERSION" --default
go version
check_status "Failed to install Go" "Go installed successfully." $?
SOURCE_RESULTS+="source $BASH_RC # For golang\n"
}
# Function to install Rust and Cargo
@ -1037,6 +1159,7 @@ deploy_rust() {
# Install cargo-make
cargo install cargo-make
check_status "Failed to install Rust" "Install Rust successfully" $?
SOURCE_RESULTS+="source $BASH_RC && source $HOME/.cargo/env # For cargo/rust\n"
else
echo "Rust is already installed."
fi
@ -1063,7 +1186,7 @@ install_node_in_ubuntu18.04() {
NODE_DISTRO="node-v$DEFAULT_NODE_VERSION-linux-x64"
update_ubuntu_gcc_18.04
echo "Installing Node..."
curl -O https://nodejs.org/dist/v22.0.0/node-v22.0.0.tar.gz
curl --retry 10 --retry-delay 5 --retry-max-time 120 -O https://nodejs.org/dist/v22.0.0/node-v22.0.0.tar.gz
tar -xzf node-v22.0.0.tar.gz
cd node-v22.0.0 || exit
./configure
@ -1115,12 +1238,13 @@ install_node_via_nvm () {
# Install NVM
if ! command -v nvm &> /dev/null; then
NVM_VERSION=$(curl -s https://api.github.com/repos/nvm-sh/nvm/releases/latest | grep -oP '"tag_name": "\K(.*)(?=")')
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/"$NVM_VERSION"/install.sh | bash
NVM_VERSION=$(curl --retry 10 --retry-delay 5 --retry-max-time 120 -s https://api.github.com/repos/nvm-sh/nvm/releases/latest | grep -oP '"tag_name": "\K(.*)(?=")')
curl --retry 10 --retry-delay 5 --retry-max-time 120 -o- https://raw.githubusercontent.com/nvm-sh/nvm/"$NVM_VERSION"/install.sh | bash
export NVM_DIR="$HOME/.nvm"
[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh"
[ -s "$NVM_DIR/bash_completion" ] && \. "$NVM_DIR/bash_completion"
echo -e "${GREEN}NVM installed successfully.${NO_COLOR}"
SOURCE_RESULTS+="source $NVM_DIR/nvm.sh && source $NVM_DIR/bash_completion # For nvm/node/npm/yarn/pnpm\n"
else
echo -e "${GREEN}NVM is already installed.${NO_COLOR}"
fi
@ -1655,6 +1779,19 @@ config_cloud_init() {
# cloud-init clean --logs
}
cleanup() {
if [ -n "$SOURCE_RESULTS" ]; then
echo -e "${YELLOW}===========================================\n${NO_COLOR}"
echo -e "${YELLOW}Installation complete! \n${NO_COLOR}"
echo -e "${YELLOW}Some tools require you to manually source${NO_COLOR}"
echo -e "${YELLOW}or restart your terminal to take effect.\n${NO_COLOR}"
echo -e "${YELLOW}===========================================\n${NO_COLOR}"
echo -e "${YELLOW}$SOURCE_RESULTS${NO_COLOR}"
else
echo -e "${YELLOW}Installation complete \n${NO_COLOR}"
fi
}
# Clone a repository with a specified target directory
clone_repo_with_rename() {
local repo_url="$1"
@ -1751,16 +1888,17 @@ clone_repos() {
new_funcs() {
echo "Adding test..."
install_python 3.10.12
# install_java 21
# install_node 16.20.2
# install_maven 3.2.5
install_python_via_pyenv 3.10.12
install_java_via_sdkman 21.0.2
install_node 16.20.2
install_maven_via_sdkman 3.2.5
deploy_rust
}
# deploy TDasset
TDasset() {
install_java 21
install_maven 3.9.9
install_java_via_sdkman 21.0.2
install_maven_via_sdkman 3.9.9
# not supported in centos7/ubuntu18 because of the old version of glibc
install_node_via_nvm 22.0.0
install_pnpm
@ -1768,16 +1906,17 @@ TDasset() {
# deploy TDinternal/TDengine/taosx
TDinternal() {
deploy_go
install_go_via_gvm 1.23.3
deploy_rust
install_java 17
install_java_via_sdkman 17
install_maven_via_sdkman 3.9.9
install_node_via_nvm 16.20.2
install_python 3.10.12
install_python_via_pyenv 3.10.12
}
# deploy TDgpt
TDgpt() {
install_python 3.10.12
install_python_via_pyenv 3.10.12
}
# deploy taos-test-framework
@ -1852,7 +1991,7 @@ deploy_dev() {
install_python
install_pip_pkg
install_java
install_maven
install_maven_via_sdkman
deploy_go
deploy_rust
install_node
@ -1908,6 +2047,9 @@ main() {
replace_sources)
replace_sources
;;
update)
update
;;
upgrade)
upgrade
;;
@ -1935,14 +2077,23 @@ main() {
install_python)
install_python
;;
install_pyenv)
install_pyenv
;;
install_python_via_pyenv)
install_python_via_pyenv
;;
install_pip_pkg)
install_pip_pkg
;;
install_java)
install_java
;;
install_maven)
install_maven
install_java_via_sdkman)
install_java_via_sdkman
;;
install_maven_via_sdkman)
install_maven_via_sdkman
;;
deploy_cmake)
deploy_cmake
@ -1959,6 +2110,12 @@ main() {
deploy_go)
deploy_go
;;
install_gvm)
install_gvm
;;
install_go_via_gvm)
install_go_via_gvm
;;
deploy_rust)
deploy_rust
;;

View File

@ -1047,8 +1047,6 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableSinkInfo->uid = 0;
code = doPutSinkTableInfoIntoCache(pTask->outputInfo.tbSink.pTbInfo, pTableSinkInfo, groupId, id);
} else {
metaReaderClear(&mr);
tqError("s-task:%s vgId:%d dst-table:%s not auto-created, and not create in tsdb, discard data", id, vgId,
dstTableName);
return TSDB_CODE_TDB_TABLE_NOT_EXIST;

View File

@ -1808,7 +1808,7 @@ int stateKeyDecode(void* k, char* buf) {
return p - buf;
}
int stateKeyToString(void* k, char* buf) {
int32_t stateKeyToString(void* k, char* buf) {
SStateKey* key = k;
int n = 0;
n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);

View File

@ -250,9 +250,10 @@ _EXIT:
streamBackendCleanup((void*)pBackend);
if (code == 0) {
char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
int32_t len = strlen(pMeta->path) + 32;
char* state = taosMemoryCalloc(1, len);
if (state != NULL) {
sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
(void) snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
taosRemoveDir(state);
taosMemoryFree(state);
} else {
@ -379,7 +380,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
char* tpath = taosMemoryCalloc(1, len);
TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
(void) snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
code = streamMetaOpenTdb(pMeta);

View File

@ -119,23 +119,21 @@ static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname)
#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \
do { \
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
} while (0)
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
int32_t ret = 0;
int32_t len = strlen(path) + 32;
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
char* fullname = taosMemoryCalloc(1, len);
if (fullname == NULL) {
stError("failed to get file:%s size, code: out of memory", name);
return terrno;
}
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
ret = snprintf(fullname, len, "%s%s%s", path, TD_DIRSEP, name);
if (ret < 0 || ret >= len) {
stError("%s failed to set the file path for get the file size, code: out of buffer", name);
return TSDB_CODE_OUT_OF_BUFFER;
}
ret = taosStatFile(fullname, sz, NULL, NULL);
taosMemoryFree(fullname);
@ -146,7 +144,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
(void) snprintf(fullname, tListLen(fullname),"%s%s%s", path, TD_DIRSEP, name);
return taosOpenFile(fullname, opt);
}
@ -155,35 +153,74 @@ int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { retur
int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
int16_t cap = 512;
int16_t cap = 512;
if (qDebugFlag & DEBUG_DEBUG) {
char* buf = taosMemoryCalloc(1, cap);
if (buf == NULL) {
stError("%s failed to alloc memory, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return;
}
int32_t nBytes = snprintf(buf + strlen(buf), cap, "[");
if (nBytes <= 0 || nBytes >= cap) {
taosMemoryFree(buf);
stError("%s failed to write buf, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_RANGE));
return;
}
int32_t len = 0;
int32_t wlen = 1;
if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
if (pSnapFile->pSst) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name);
do {
buf[0] = '[';
if (pSnapFile->pCurrent) {
len = snprintf(buf + wlen, cap - wlen, "current: %s,", pSnapFile->pCurrent);
if (len > 0 && len < (cap - wlen)) {
wlen += len;
} else {
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
break;
}
}
if (pSnapFile->pMainfest) {
len = snprintf(buf + wlen, cap - wlen, "MANIFEST: %s,", pSnapFile->pMainfest);
if (len > 0 && len < (cap - wlen)) {
wlen += len;
} else {
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
break;
}
}
if (pSnapFile->pOptions) {
len = snprintf(buf + wlen, cap - wlen, "options: %s,", pSnapFile->pOptions);
if (len > 0 && len < (cap - wlen)) {
wlen += len;
} else {
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
break;
}
}
if (pSnapFile->pSst) {
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
if (strlen(buf) + strlen(name) < cap) {
len = snprintf(buf + wlen, cap - wlen, "%s,", name);
if (len > 0 && len < (cap - wlen)) {
wlen += len;
} else {
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
break;
}
}
}
}
} while (0);
if (wlen < cap) {
buf[wlen] = ']';
}
if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]");
buf[cap - 1] = '\0';
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
taosMemoryFree(buf);
}
}
@ -771,16 +808,23 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
if (pDbSnapFile->inited == 0) {
char idstr[64] = {0};
sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
(void)snprintf(idstr, tListLen(idstr), "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
int32_t bufLen = strlen(pHandle->metaPath) + 256;
char* path = taosMemoryCalloc(1, bufLen);
if (path == NULL) {
stError("s-task:0x%x failed to prepare meta header buffer, code:Out of memory", (int32_t) snapInfo.taskId);
return terrno;
}
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", snapInfo.chkpId);
int32_t ret = snprintf(path, bufLen, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP,
"checkpoints", TD_DIRSEP, "checkpoint", snapInfo.chkpId);
if (ret < 0 || ret >= bufLen) {
stError("s-task:0x%x failed to set the path for take snapshot, code: out of buffer, %s", (int32_t)snapInfo.taskId,
pHandle->metaPath);
return TSDB_CODE_OUT_OF_BUFFER;
}
if (!taosIsDir(path)) {
code = taosMulMkDir(path);
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);

View File

@ -132,8 +132,12 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
return code;
}
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
char buf[128] = {0};
int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
if (ret < 0 || ret >= tListLen(buf)) {
stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
return TSDB_CODE_OUT_OF_BUFFER;
}
pTask->id.idStr = taosStrdup(buf);
if (pTask->id.idStr == NULL) {
@ -402,7 +406,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
}
char id[128] = {0};
int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
if (nBytes < 0 || nBytes >= sizeof(id)) {
return TSDB_CODE_OUT_OF_BUFFER;
}
@ -413,10 +417,14 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
return terrno;
}
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
return 0;
int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
if (code < 0 || code >= len + nBytes + 2) {
stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
return TSDB_CODE_OUT_OF_BUFFER;
} else {
stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
return 0;
}
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
@ -1129,7 +1137,11 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId);
if (code < 0 || code >= tListLen(buf)) {
return TSDB_CODE_OUT_OF_BUFFER;
}
*pId = taosStrdup(buf);
if (*pId == NULL) {

286
tests/perf-test/stream.py Normal file
View File

@ -0,0 +1,286 @@
import json
import subprocess
import psutil
import time
import taos
class MonitorSystemLoad:
def __init__(self, name, count) -> None:
self.pid = self.get_pid_by_name(name)
self.count = count
def get_pid_by_name(self, name):
for proc in psutil.process_iter(['pid', 'name']):
if proc.info['name'] == name:
return proc.info['pid']
return None
def get_proc_status(self):
process = psutil.Process(self.pid)
while True:
cpu_percent = process.cpu_percent(interval=1)
memory_info = process.memory_info()
memory_percent = process.memory_percent()
io_counters = process.io_counters()
sys_load = psutil.getloadavg()
print("load: %s, CPU:%s, Mem:%.2f MiB(%.2f%%), Read: %.2fMiB(%d), Write: %.2fMib (%d)" % (
sys_load, cpu_percent, memory_info.rss / 1048576.0,
memory_percent, io_counters.read_bytes / 1048576.0, io_counters.read_count,
io_counters.write_bytes / 1048576.0, io_counters.write_count))
time.sleep(1)
self.count -= 1
if self.count <= 0:
break
class StreamStarter:
def __init__(self) -> None:
self.sql = None
self.host='127.0.0.1'
self.user = 'root'
self.passwd = 'taosdata'
self.conf = '/etc/taos/taos.cfg'
self.tz = 'Asia/Shanghai'
def prepare_data(self) -> dict:
json_data = {
"filetype": "insert",
"cfgdir": "/etc/taos/cfg",
"host": "127.0.0.1",
"port": 6030,
"rest_port": 6041,
"user": "root",
"password": "taosdata",
"thread_count": 20,
"create_table_thread_count": 40,
"result_file": "/tmp/taosBenchmark_result.log",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 10000,
"max_sql_len": 1024000,
"databases": [
{
"dbinfo": {
"name": "stream_test",
"drop": "yes",
"replica": 1,
"duration": 10,
"precision": "ms",
"keep": 3650,
"minRows": 100,
"maxRows": 4096,
"comp": 2,
"vgroups": 10,
"stt_trigger": 1,
"WAL_RETENTION_PERIOD": 86400
},
"super_tables": [
{
"name": "stb",
"child_table_exists": "yes",
"childtable_count": 500,
"childtable_prefix": "ctb0_",
"escape_character": "no",
"auto_create_table": "yes",
"batch_create_tbl_num": 1000,
"data_source": "rand",
"insert_mode": "taosc",
"interlace_rows": 400,
"tcp_transfer": "no",
"insert_rows": 10000,
"partial_col_num": 0,
"childtable_limit": 0,
"childtable_offset": 0,
"rows_per_tbl": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"keep_trying": -1,
"timestamp_step": 1000,
"trying_interval": 10,
"start_timestamp": "2021-01-01 00:00:00",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [
{
"type": "INT",
"count": 1
},
{
"type": "TINYINT",
"count": 0
},
{
"type": "DOUBLE",
"count": 0
},
{
"type": "VARCHAR",
"count": 0,
"len": 16
},
{
"type": "NCHAR",
"count": 0,
"len": 4
},
{
"type": "SMALLINT",
"count": 0
},
{
"type": "BIGINT",
"count": 0
},
{
"type": "UTINYINT",
"count": 0
},
{
"type": "USMALLINT",
"count": 0
},
{
"type": "UINT",
"count": 0
},
{
"type": "UBIGINT",
"count": 0
},
{
"type": "FLOAT",
"count": 2
},
{
"type": "BINARY",
"count": 0,
"len": 8
},
{
"type": "BOOL",
"count": 0
},
{
"type": "TIMESTAMP",
"count": 1
}
],
"tags": [
{
"type": "INT",
"count": 0
},
{
"type": "TINYINT",
"count": 1
},
{
"type": "DOUBLE",
"count": 0
},
{
"type": "VARCHAR",
"count": 0,
"len": 8
},
{
"type": "NCHAR",
"count": 0,
"len": 16
},
{
"type": "SMALLINT",
"count": 0
},
{
"type": "BIGINT",
"count": 0
},
{
"type": "UTINYINT",
"count": 0
},
{
"type": "USMALLINT",
"count": 0
},
{
"type": "UINT",
"count": 0
},
{
"type": "UBIGINT",
"count": 0
},
{
"type": "FLOAT",
"count": 0
},
{
"type": "BINARY",
"count": 1,
"len": 16
},
{
"type": "BOOL",
"count": 0
},
{
"type": "TIMESTAMP",
"count": 0
}
]
}
]
}
],
"prepare_rand": 10000,
"chinese": "no",
"test_log": "/tmp/testlog/"
}
with open('/tmp/stream.json', 'w+') as f:
json.dump(json_data, f, indent=4)
def do_start(self):
self.prepare_data()
try:
subprocess.Popen('taosBenchmark --f /tmp/stream.json', stdout=subprocess.PIPE, shell=True, text=True)
except subprocess.CalledProcessError as e:
print(f"Error running Bash command: {e}")
conn = taos.connect(
host=self.host, user=self.user, password=self.passwd, config=self.conf, timezone=self.tz
)
time.sleep(10)
print("start to connect db")
cursor = conn.cursor()
cursor.execute('use stream_test')
sql = "create stream str1 ignore update 0 into str1_dst as select _wstart as wstart, min(c1),max(c2), count(c3) from stream_test.stb partition by cast(t1 as int) t1,tbname interval(5s)"
cursor.execute(sql)
print("create stream completed, start to monitor system load")
conn.close()
loader = MonitorSystemLoad('taosd', 80)
loader.get_proc_status()
if __name__ == "__main__":
StreamStarter().do_start()