Merge branch 'main' into merge/mainto3.02

This commit is contained in:
Shengliang Guan 2025-02-08 11:12:14 +08:00
commit 1c8b1520d6
43 changed files with 379 additions and 184 deletions

View File

@ -75,4 +75,4 @@ available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.ht
[homepage]: https://www.contributor-covenant.org [homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq https://www.contributor-covenant.org/faq

View File

@ -1,13 +0,0 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 3.0
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)

View File

@ -205,9 +205,18 @@ ENDIF()
# download dependencies # download dependencies
configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt") configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" . execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download"
RESULT_VARIABLE result)
IF(NOT result EQUAL "0")
message(FATAL_ERROR "CMake step for dowloading dependencies failed: ${result}")
ENDIF()
execute_process(COMMAND "${CMAKE_COMMAND}" --build . execute_process(COMMAND "${CMAKE_COMMAND}" --build .
WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download") WORKING_DIRECTORY "${TD_CONTRIB_DIR}/deps-download"
RESULT_VARIABLE result)
IF(NOT result EQUAL "0")
message(FATAL_ERROR "CMake step for building dependencies failed: ${result}")
ENDIF()
# ================================================================================================ # ================================================================================================
# Build # Build

View File

@ -31,12 +31,12 @@ There are many parameters for creating consumers, which flexibly support various
| Parameter Name | Type | Description | Remarks | | Parameter Name | Type | Description | Remarks |
| :-----------------------: | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | :-----------------------: | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.ip` | string | Server IP address | | | `td.connect.ip` | string | FQDN of Server | ip or host name |
| `td.connect.user` | string | Username | | | `td.connect.user` | string | Username | |
| `td.connect.pass` | string | Password | | | `td.connect.pass` | string | Password | |
| `td.connect.port` | integer | Server port number | | | `td.connect.port` | integer | Server port number | |
| `group.id` | string | Consumer group ID, the same consumer group shares consumption progress | <br />**Required**. Maximum length: 192.<br />Each topic can have up to 100 consumer groups | | `group.id` | string | Consumer group ID, the same consumer group shares consumption progress | <br />**Required**. Maximum length: 192,excess length will be cut off.<br />Each topic can have up to 100 consumer groups |
| `client.id` | string | Client ID | Maximum length: 192 | | `client.id` | string | Client ID | Maximum length: 255, excess length will be cut off. |
| `auto.offset.reset` | enum | Initial position of the consumer group subscription | <br />`earliest`: default(version < 3.2.0.0); subscribe from the beginning; <br/>`latest`: default(version >= 3.2.0.0); only subscribe from the latest data; <br/>`none`: cannot subscribe without a committed offset | | `auto.offset.reset` | enum | Initial position of the consumer group subscription | <br />`earliest`: default(version < 3.2.0.0); subscribe from the beginning; <br/>`latest`: default(version >= 3.2.0.0); only subscribe from the latest data; <br/>`none`: cannot subscribe without a committed offset |
| `enable.auto.commit` | boolean | Whether to enable automatic consumption point submission, true: automatic submission, client application does not need to commit; false: client application needs to commit manually | Default is true | | `enable.auto.commit` | boolean | Whether to enable automatic consumption point submission, true: automatic submission, client application does not need to commit; false: client application needs to commit manually | Default is true |
| `auto.commit.interval.ms` | integer | Time interval for automatically submitting consumption records, in milliseconds | Default is 5000 | | `auto.commit.interval.ms` | integer | Time interval for automatically submitting consumption records, in milliseconds | Default is 5000 |

View File

@ -1,14 +1,19 @@
--- ---
title: Data Backup and Restoration title: Data Backup and Restoration
slug: /operations-and-maintenance/back-up-and-restore-data slug: /operations-and-maintenance/data-backup-and-restoration
--- ---
To prevent data loss and accidental deletions, TDengine provides comprehensive features such as data backup, restoration, fault tolerance, and real-time synchronization of remote data to ensure the security of data storage. This section briefly explains the backup and restoration functions. import Image from '@theme/IdealImage';
import imgBackup from '../assets/data-backup-01.png';
You can back up the data in your TDengine cluster and restore it in the event that data is lost or damaged.
## Data Backup and Restoration Using taosdump ## Data Backup and Restoration Using taosdump
taosdump is an open-source tool that supports backing up data from a running TDengine cluster and restoring the backed-up data to the same or another running TDengine cluster. taosdump can back up the database as a logical data unit or back up data records within a specified time period in the database. When using taosdump, you can specify the directory path for data backup. If no directory path is specified, taosdump will default to backing up the data in the current directory. taosdump is an open-source tool that supports backing up data from a running TDengine cluster and restoring the backed-up data to the same or another running TDengine cluster. taosdump can back up the database as a logical data unit or back up data records within a specified time period in the database. When using taosdump, you can specify the directory path for data backup. If no directory path is specified, taosdump will default to backing up the data in the current directory.
### Back Up Data with taosdump
Below is an example of using taosdump to perform data backup. Below is an example of using taosdump to perform data backup.
```shell ```shell
@ -19,6 +24,8 @@ After executing the above command, taosdump will connect to the TDengine cluster
When using taosdump, if the specified storage path already contains data files, taosdump will prompt the user and exit immediately to avoid data overwriting. This means the same storage path can only be used for one backup. If you see related prompts, please operate carefully to avoid accidental data loss. When using taosdump, if the specified storage path already contains data files, taosdump will prompt the user and exit immediately to avoid data overwriting. This means the same storage path can only be used for one backup. If you see related prompts, please operate carefully to avoid accidental data loss.
### Restore Data with taosdump
To restore data files from a specified local file path to a running TDengine cluster, you can execute the taosdump command by specifying command-line parameters and the data file path. Below is an example code for taosdump performing data restoration. To restore data files from a specified local file path to a running TDengine cluster, you can execute the taosdump command by specifying command-line parameters and the data file path. Below is an example code for taosdump performing data restoration.
```shell ```shell
@ -27,25 +34,62 @@ taosdump -i /file/path -h localhost -P 6030
After executing the above command, taosdump will connect to the TDengine cluster at localhost:6030 and restore the data files from /file/path to the TDengine cluster. After executing the above command, taosdump will connect to the TDengine cluster at localhost:6030 and restore the data files from /file/path to the TDengine cluster.
## Data Backup and Restoration Based on TDengine Enterprise ## Data Backup and Restoration in TDengine Enterprise
TDengine Enterprise provides an efficient incremental backup feature, with the following process. TDengine Enterprise implements incremental backup and recovery of data by using data subscription. The backup and recovery functions of TDengine Enterprise include the following concepts:
Step 1, access the taosExplorer service through a browser, usually at the port 6060 of the IP address where the TDengine cluster is located, such as `http://localhost:6060`. 1. Incremental data backup: Based on TDengine's data subscription function, all data changes of **the backup object** (including: addition, modification, deletion, metadata change, etc.) are recorded to generate a backup file.
2. Data recovery: Use the backup file generated by incremental data backup to restore **the backup object** to a specified point in time.
3. Backup object: The object that the user backs up can be a **database** or a **supertable**.
4. Backup plan: The user creates a periodic backup task for the backup object. The backup plan starts at a specified time point and periodically executes the backup task at intervals of **the backup cycle. Each backup task generates a** **backup point** .
5. Backup point: Each time a backup task is executed, a set of backup files is generated. They correspond to a time point, called **a backup point** . The first backup point is called **the initial backup point** .
6. Restore task: The user selects a backup point in the backup plan and creates a restore task. The restore task starts from **the initial backup point** and plays back the data changes in **the backup file** one by one until the specified backup point ends.
Step 2, in the "System Management - Backup" page of the taosExplorer service, add a new data backup task, fill in the database name and backup storage file path in the task configuration information, and start the data backup after completing the task creation. Three parameters can be configured on the data backup configuration page: ### Incremental Backup Example
- Backup cycle: Required, configure the time interval for each data backup execution, which can be selected from a dropdown menu to execute once every day, every 7 days, or every 30 days. After configuration, a data backup task will be initiated at 0:00 of the corresponding backup cycle; <figure>
- Database: Required, configure the name of the database to be backed up (the database's wal_retention_period parameter must be greater than 0); <Image img={imgBackup} alt="Incremental backup process"/>
- Directory: Required, configure the path in the running environment of taosX where the data will be backed up, such as `/root/data_backup`; <figcaption>Figure 1. Incremental backup process</figcaption>
</figure>
Step 3, after the data backup task is completed, find the created data backup task in the list of created tasks on the same page, and directly perform one-click restoration to restore the data to TDengine. 1. The user creates a backup plan to execute the backup task every 1 day starting from 2024-08-27 00:00:00 .
2. The first backup task was executed at 2024-08-27 00:00:00, generating an initial backup point .
3. After that, the backup task is executed every 1 day, and multiple backup points are generated .
4. Users can select a backup point and create a restore task .
5. The restore task starts from the initial backup point, applies the backup points one by one, and restores to the specified backup point.
Compared to taosdump, if the same data is backed up multiple times in the specified storage path, since TDengine Enterprise not only has high backup efficiency but also implements incremental processing, each backup task will be completed quickly. As taosdump always performs full backups, TDengine Enterprise can significantly reduce system overhead in scenarios with large data volumes and is more convenient. ### Back Up Data in TDengine Enterprise
**Common Error Troubleshooting** 1. In a web browser, open the taosExplorer interface for TDengine. This interface is located on port 6060 on the hostname or IP address running TDengine.
2. In the main menu on the left, click **Management** and open the **Backup** tab.
3. Under **Backup Plan**, click **Create New Backup** to define your backup plan.
1. **Database:** Select the database that you want to backup.
2. **Super Table:** (Optional) Select the supertable that you want to backup. If you do not select a supertable, all data in the database is backed up.
3. **Next execution time:** Enter the date and time when you want to perform the initial backup for this backup plan. If you specify a date and time in the past, the initial backup is performed immediately.
4. **Backup Cycle:** Specify how often you want to perform incremental backups. The value of this field must be less than the value of `WAL_RETENTION_PERIOD` for the specified database.
5. **Retry times:** Enter how many times you want to retry a backup task that has failed, provided that the specific failure might be resolved by retrying.
6. **Retry interval:** Enter the delay in seconds between retry attempts.
7. **Directory:** Enter the full path of the directory in which you want to store backup files.
8. **Backup file max size:** Enter the maximum size of a single backup file. If the total size of your backup exceeds this number, the backup is split into multiple files.
9. **Compression level:** Select **fastest** for the fastest performance but lowest compression ratio, **best** for the highest compression ratio but slowest performance, or **balanced** for a combination of performance and compression.
1. If the task fails to start and reports the following error: 4. Click **Confirm** to create the backup plan.
You can view your backup plans and modify, clone, or delete them using the buttons in the **Operation** columns. Click **Refresh** to update the status of your plans. Note that you must stop a backup plan before you can delete it. You can also click **View** in the **Backup File** column to view the backup record points and files created by each plan.
### Restore Data in TDengine Enterprise
1. Locate the backup plan containing data that you want to restore and click **View** in the **Backup File** column.
2. Determine the backup record point to which you want to restore and click the Restore icon in the **Operation** column.
3. Select the backup file timestamp and target database and click **Confirm**.
## Troubleshooting
### Port Access Exception
A port access exception is indicated by the following error:
```text ```text
Error: tmq to td task exec error Error: tmq to td task exec error
@ -54,9 +98,11 @@ Caused by:
[0x000B] Unable to establish connection [0x000B] Unable to establish connection
``` ```
The cause is an abnormal connection to the data source port, check whether the data source FQDN is connected and whether port 6030 is accessible. If you encounter this error, check whether the data source FQDN is connected and whether port 6030 is listening and accessible.
2. If using a WebSocket connection, the task fails to start and reports the following error: ### Connection Issues
A connection issue is indicated by the task failing to start and reporting the following error:
```text ```text
Error: tmq to td task exec error Error: tmq to td task exec error
@ -67,15 +113,16 @@ Caused by:
2: failed to lookup address information: Temporary failure in name resolution 2: failed to lookup address information: Temporary failure in name resolution
``` ```
When using a WebSocket connection, you may encounter various types of errors, which can be seen after "Caused by". Here are some possible errors: The following are some possible errors for WebSocket connections:
- "Temporary failure in name resolution": DNS resolution error. Check whether the specified IP address or FQDN can be accessed normally.
- "IO error: Connection refused (os error 111)": Port access failed. Check whether the port is configured correctly and is enabled and accessible.
- "IO error: received corrupt message": Message parsing failed. This may be because SSL was enabled using the WSS method, but the source port is not supported.
- "HTTP error: *": Confirm that you are connecting to the correct taosAdapter port and that your LSB/Nginx/Proxy has been configured correctly.
- "WebSocket protocol error: Handshake not finished": WebSocket connection error. This is typically caused by an incorrectly configured port.
- "Temporary failure in name resolution": DNS resolution error, check if the IP or FQDN can be accessed normally. ### WAL Configuration
- "IO error: Connection refused (os error 111)": Port access failure, check if the port is configured correctly or if it is open and accessible.
- "IO error: received corrupt message": Message parsing failed, possibly because SSL was enabled using wss, but the source port does not support it.
- "HTTP error: *": Possibly connected to the wrong taosAdapter port or incorrect LSB/Nginx/Proxy configuration.
- "WebSocket protocol error: Handshake not finished": WebSocket connection error, usually because the configured port is incorrect.
3. If the task fails to start and reports the following error: A WAL configuration issue is indicated by the task failing to start and reporting the following error:
```text ```text
Error: tmq to td task exec error Error: tmq to td task exec error
@ -84,11 +131,8 @@ Caused by:
[0x038C] WAL retention period is zero [0x038C] WAL retention period is zero
``` ```
This is due to incorrect WAL configuration in the source database, preventing subscription. To resolve this error, modify the WAL retention period for the affected database:
Solution:
Modify the data WAL configuration:
```sql ```sql
alter database test wal_retention_period 3600; ALTER DATABASE test WAL_RETENTION_PERIOD 3600;
``` ```

View File

@ -3,13 +3,9 @@ title: Release Notes
slug: /release-history/release-notes slug: /release-history/release-notes
--- ---
[3.3.5.0](./3-3-5-0/) ```mdx-code-block
import DocCardList from '@theme/DocCardList';
import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
[3.3.5.2](./3.3.5.2) <DocCardList items={useCurrentSidebarCategory().items}/>
[3.3.4.8](./3-3-4-8/) ```
[3.3.4.3](./3-3-4-3/)
[3.3.3.0](./3-3-3-0/)
[3.3.2.0](./3-3-2-0/)

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.0</version> <version>2.7.18</version>
<relativePath/> <!-- lookup parent from repository --> <relativePath/> <!-- lookup parent from repository -->
</parent> </parent>
<groupId>com.taosdata.example</groupId> <groupId>com.taosdata.example</groupId>
@ -18,6 +18,18 @@
<java.version>1.8</java.version> <java.version>1.8</java.version>
</properties> </properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-bom</artifactId>
<version>3.5.10.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
@ -28,14 +40,21 @@
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<optional>true</optional> <optional>true</optional>
</dependency> </dependency>
<!-- spring boot2 引入可选模块 -->
<dependency> <dependency>
<groupId>com.baomidou</groupId> <groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId> <artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.1.2</version> </dependency>
<!-- jdk 8+ 引入可选模块 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-jsqlparser-4.9</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.h2database</groupId> <groupId>com.h2database</groupId>
<artifactId>h2</artifactId> <artifactId>h2</artifactId>
<version>2.3.232</version>
<scope>runtime</scope> <scope>runtime</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -1,34 +1,26 @@
package com.taosdata.example.mybatisplusdemo.config; package com.taosdata.example.mybatisplusdemo.config;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
@Configuration @Configuration
@MapperScan("com.taosdata.example.mybatisplusdemo.mapper")
public class MybatisPlusConfig { public class MybatisPlusConfig {
/**
/** mybatis 3.4.1 pagination config start ***/ * 添加分页插件
// @Bean */
// public MybatisPlusInterceptor mybatisPlusInterceptor() {
// MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
// interceptor.addInnerInterceptor(new PaginationInnerInterceptor());
// return interceptor;
// }
// @Bean
// public ConfigurationCustomizer configurationCustomizer() {
// return configuration -> configuration.setUseDeprecatedExecutor(false);
// }
@Bean @Bean
public PaginationInterceptor paginationInterceptor() { public MybatisPlusInterceptor mybatisPlusInterceptor() {
// return new PaginationInterceptor(); MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();
PaginationInterceptor paginationInterceptor = new PaginationInterceptor(); interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.MYSQL));
//TODO: mybatis-plus do not support TDengine, use postgresql Dialect return interceptor;
paginationInterceptor.setDialectType("postgresql");
return paginationInterceptor;
} }
} }

View File

@ -5,6 +5,7 @@ import com.taosdata.example.mybatisplusdemo.domain.Meters;
import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update; import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.executor.BatchResult;
import java.util.List; import java.util.List;
@ -15,17 +16,6 @@ public interface MetersMapper extends BaseMapper<Meters> {
@Insert("insert into meters (tbname, ts, groupid, location, current, voltage, phase) values(#{tbname}, #{ts}, #{groupid}, #{location}, #{current}, #{voltage}, #{phase})") @Insert("insert into meters (tbname, ts, groupid, location, current, voltage, phase) values(#{tbname}, #{ts}, #{groupid}, #{location}, #{current}, #{voltage}, #{phase})")
int insertOne(Meters one); int insertOne(Meters one);
@Insert({
"<script>",
"insert into meters (tbname, ts, groupid, location, current, voltage, phase) values ",
"<foreach collection='list' item='item' index='index' separator=','>",
"(#{item.tbname}, #{item.ts}, #{item.groupid}, #{item.location}, #{item.current}, #{item.voltage}, #{item.phase})",
"</foreach>",
"</script>"
})
int insertBatch(@Param("list") List<Meters> metersList);
@Update("drop stable if exists meters") @Update("drop stable if exists meters")
void dropTable(); void dropTable();
} }

View File

@ -11,9 +11,6 @@ public interface TemperatureMapper extends BaseMapper<Temperature> {
@Update("CREATE TABLE if not exists temperature(ts timestamp, temperature float) tags(location nchar(64), tbIndex int)") @Update("CREATE TABLE if not exists temperature(ts timestamp, temperature float) tags(location nchar(64), tbIndex int)")
int createSuperTable(); int createSuperTable();
@Update("create table #{tbName} using temperature tags( #{location}, #{tbindex})")
int createTable(@Param("tbName") String tbName, @Param("location") String location, @Param("tbindex") int tbindex);
@Update("drop table if exists temperature") @Update("drop table if exists temperature")
void dropSuperTable(); void dropSuperTable();

View File

@ -10,7 +10,7 @@ public interface WeatherMapper extends BaseMapper<Weather> {
@Update("CREATE TABLE if not exists weather(ts timestamp, temperature float, humidity int, location nchar(100))") @Update("CREATE TABLE if not exists weather(ts timestamp, temperature float, humidity int, location nchar(100))")
int createTable(); int createTable();
@Insert("insert into weather (ts, temperature, humidity, location) values(#{ts}, #{temperature}, #{humidity}, #{location})") @Insert("insert into weather (ts, temperature, humidity, location) values(#{ts}, #{temperature}, #{humidity}, #{location, jdbcType=NCHAR})")
int insertOne(Weather one); int insertOne(Weather one);
@Update("drop table if exists weather") @Update("drop table if exists weather")

View File

@ -0,0 +1,19 @@
package com.taosdata.example.mybatisplusdemo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
@Service
public class DatabaseConnectionService {
@Autowired
private DataSource dataSource;
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
}

View File

@ -0,0 +1,23 @@
package com.taosdata.example.mybatisplusdemo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@Service
public class TemperatureService {
@Autowired
private DatabaseConnectionService databaseConnectionService;
public void createTable(String tableName, String location, int tbIndex) throws SQLException {
try (Connection connection = databaseConnectionService.getConnection();
Statement statement = connection.createStatement()) {
statement.executeUpdate("create table " + tableName + " using temperature tags( '" + location +"', " + tbIndex + ")");
}
}
}

View File

@ -5,6 +5,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.taosdata.example.mybatisplusdemo.domain.Meters; import com.taosdata.example.mybatisplusdemo.domain.Meters;
import com.taosdata.example.mybatisplusdemo.domain.Weather; import com.taosdata.example.mybatisplusdemo.domain.Weather;
import org.apache.ibatis.executor.BatchResult;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -18,6 +19,8 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import static java.sql.Statement.SUCCESS_NO_INFO;
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest @SpringBootTest
public class MetersMapperTest { public class MetersMapperTest {
@ -63,8 +66,19 @@ public class MetersMapperTest {
metersList.add(one); metersList.add(one);
} }
int affectRows = mapper.insertBatch(metersList); List<BatchResult> affectRowsList = mapper.insert(metersList, 10000);
Assert.assertEquals(100, affectRows);
long totalAffectedRows = 0;
for (BatchResult batchResult : affectRowsList) {
int[] updateCounts = batchResult.getUpdateCounts();
for (int status : updateCounts) {
if (status == SUCCESS_NO_INFO) {
totalAffectedRows++;
}
}
}
Assert.assertEquals(100, totalAffectedRows);
} }
@Test @Test
@ -93,7 +107,7 @@ public class MetersMapperTest {
@Test @Test
public void testSelectCount() { public void testSelectCount() {
int count = mapper.selectCount(null); long count = mapper.selectCount(null);
// Assert.assertEquals(5, count); // Assert.assertEquals(5, count);
System.out.println(count); System.out.println(count);
} }

View File

@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.taosdata.example.mybatisplusdemo.domain.Temperature; import com.taosdata.example.mybatisplusdemo.domain.Temperature;
import com.taosdata.example.mybatisplusdemo.service.TemperatureService;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -13,6 +14,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -22,18 +25,20 @@ import java.util.Random;
@RunWith(SpringJUnit4ClassRunner.class) @RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest @SpringBootTest
public class TemperatureMapperTest { public class TemperatureMapperTest {
@Autowired
private TemperatureService temperatureService;
private static Random random = new Random(System.currentTimeMillis()); private static Random random = new Random(System.currentTimeMillis());
private static String[] locations = {"北京", "上海", "深圳", "广州", "杭州"}; private static String[] locations = {"北京", "上海", "深圳", "广州", "杭州"};
@Before @Before
public void before() { public void before() throws SQLException {
mapper.dropSuperTable(); mapper.dropSuperTable();
// create table temperature // create table temperature
mapper.createSuperTable(); mapper.createSuperTable();
// create table t_X using temperature // create table t_X using temperature
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
mapper.createTable("t" + i, locations[random.nextInt(locations.length)], i); temperatureService.createTable("t" + i, locations[i % locations.length], i);
} }
// insert into table // insert into table
int affectRows = 0; int affectRows = 0;
@ -107,7 +112,7 @@ public class TemperatureMapperTest {
* **/ * **/
@Test @Test
public void testSelectCount() { public void testSelectCount() {
int count = mapper.selectCount(null); long count = mapper.selectCount(null);
Assert.assertEquals(10, count); Assert.assertEquals(10, count);
} }

View File

@ -52,7 +52,7 @@ public class WeatherMapperTest {
one.setTemperature(random.nextFloat() * 50); one.setTemperature(random.nextFloat() * 50);
one.setHumidity(random.nextInt(100)); one.setHumidity(random.nextInt(100));
one.setLocation("望京"); one.setLocation("望京");
int affectRows = mapper.insert(one); int affectRows = mapper.insertOne(one);
Assert.assertEquals(1, affectRows); Assert.assertEquals(1, affectRows);
} }
@ -82,7 +82,7 @@ public class WeatherMapperTest {
@Test @Test
public void testSelectCount() { public void testSelectCount() {
int count = mapper.selectCount(null); long count = mapper.selectCount(null);
// Assert.assertEquals(5, count); // Assert.assertEquals(5, count);
System.out.println(count); System.out.println(count);
} }

View File

@ -5,7 +5,7 @@
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId> <artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.15</version> <version>2.7.18</version>
<relativePath/> <!-- lookup parent from repository --> <relativePath/> <!-- lookup parent from repository -->
</parent> </parent>
<groupId>com.taosdata.example</groupId> <groupId>com.taosdata.example</groupId>
@ -34,7 +34,7 @@
<dependency> <dependency>
<groupId>org.mybatis.spring.boot</groupId> <groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId> <artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version> <version>2.3.2</version>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -50,13 +50,6 @@
), groupId int) ), groupId int)
</update> </update>
<update id="createTable" parameterType="com.taosdata.example.springbootdemo.domain.Weather">
create table if not exists test.t#{groupId} using test.weather tags
(
#{location},
#{groupId}
)
</update>
<select id="select" resultMap="BaseResultMap"> <select id="select" resultMap="BaseResultMap">
select * from test.weather order by ts desc select * from test.weather order by ts desc
@ -69,8 +62,8 @@
</select> </select>
<insert id="insert" parameterType="com.taosdata.example.springbootdemo.domain.Weather"> <insert id="insert" parameterType="com.taosdata.example.springbootdemo.domain.Weather">
insert into test.t#{groupId} (ts, temperature, humidity, note, bytes) insert into test.t${groupId} (ts, temperature, humidity, note, bytes)
values (#{ts}, ${temperature}, ${humidity}, #{note}, #{bytes}) values (#{ts}, #{temperature}, #{humidity}, #{note}, #{bytes})
</insert> </insert>
<select id="getSubTables" resultType="String"> <select id="getSubTables" resultType="String">

View File

@ -0,0 +1,19 @@
package com.taosdata.example.springbootdemo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
@Service
public class DatabaseConnectionService {
@Autowired
private DataSource dataSource;
public Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
}

View File

@ -6,6 +6,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -16,6 +19,9 @@ public class WeatherService {
@Autowired @Autowired
private WeatherMapper weatherMapper; private WeatherMapper weatherMapper;
@Autowired
private DatabaseConnectionService databaseConnectionService;
private Random random = new Random(System.currentTimeMillis()); private Random random = new Random(System.currentTimeMillis());
private String[] locations = {"北京", "上海", "广州", "深圳", "天津"}; private String[] locations = {"北京", "上海", "广州", "深圳", "天津"};
@ -32,7 +38,7 @@ public class WeatherService {
weather.setGroupId(i % locations.length); weather.setGroupId(i % locations.length);
weather.setNote("note-" + i); weather.setNote("note-" + i);
weather.setBytes(locations[random.nextInt(locations.length)].getBytes(StandardCharsets.UTF_8)); weather.setBytes(locations[random.nextInt(locations.length)].getBytes(StandardCharsets.UTF_8));
weatherMapper.createTable(weather); createTable(weather);
count += weatherMapper.insert(weather); count += weatherMapper.insert(weather);
} }
return count; return count;
@ -78,4 +84,14 @@ public class WeatherService {
weather.setLocation(location); weather.setLocation(location);
return weather; return weather;
} }
public void createTable(Weather weather) {
try (Connection connection = databaseConnectionService.getConnection();
Statement statement = connection.createStatement()) {
String tableName = "t" + weather.getGroupId();
statement.executeUpdate("create table if not exists " + tableName + " using test.weather tags( '" + weather.getLocation() +"', " + weather.getGroupId() + ")");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -4,8 +4,8 @@
#spring.datasource.username=root #spring.datasource.username=root
#spring.datasource.password=taosdata #spring.datasource.password=taosdata
# datasource config - JDBC-RESTful # datasource config - JDBC-RESTful
spring.datasource.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver spring.datasource.driver-class-name=com.taosdata.jdbc.ws.WebSocketDriver
spring.datasource.url=jdbc:TAOS-RS://localhost:6041/test spring.datasource.url=jdbc:TAOS-WS://localhost:6041/test
spring.datasource.username=root spring.datasource.username=root
spring.datasource.password=taosdata spring.datasource.password=taosdata
spring.datasource.druid.initial-size=5 spring.datasource.druid.initial-size=5

View File

@ -30,12 +30,12 @@ TDengine 消费者的概念跟 Kafka 类似,消费者通过订阅主题来接
| 参数名称 | 类型 | 参数说明 | 备注 | | 参数名称 | 类型 | 参数说明 | 备注 |
| :-----------------------: | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | :-----------------------: | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `td.connect.ip` | string | 服务端的 IP 地址 | | | `td.connect.ip` | string | 服务端的 FQDN | 可以是ip或者host name |
| `td.connect.user` | string | 用户名 | | | `td.connect.user` | string | 用户名 | |
| `td.connect.pass` | string | 密码 | | | `td.connect.pass` | string | 密码 | |
| `td.connect.port` | integer | 服务端的端口号 | | | `td.connect.port` | integer | 服务端的端口号 | |
| `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192。<br />每个topic最多可建立 100 个 consumer group | | `group.id` | string | 消费组 ID同一消费组共享消费进度 | <br />**必填项**。最大长度192,超长将截断<br />每个topic最多可建立 100 个 consumer group |
| `client.id` | string | 客户端 ID | 最大长度:192 | | `client.id` | string | 客户端 ID | 最大长度255超长将截断。 |
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 | | `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version < 3.2.0.0);从头开始订阅; <br/>`latest`: default(version >= 3.2.0.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true | | `enable.auto.commit` | boolean | 是否启用消费位点自动提交true: 自动提交客户端应用无需commitfalse客户端应用需要自行commit | 默认值为 true |
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 | | `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |

View File

@ -4,9 +4,9 @@ sidebar_label: 版本说明
description: 各版本版本说明 description: 各版本版本说明
--- ---
[3.3.5.2](./3.3.5.2) ```mdx-code-block
[3.3.5.0](./3.3.5.0) import DocCardList from '@theme/DocCardList';
[3.3.4.8](./3.3.4.8) import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
[3.3.4.3](./3.3.4.3)
[3.3.3.0](./3.3.3.0) <DocCardList items={useCurrentSidebarCategory().items}/>
[3.3.2.0](./3.3.2.0) ```

View File

@ -37,6 +37,7 @@ typedef enum {
SYNC_RD_QUEUE, SYNC_RD_QUEUE,
STREAM_QUEUE, STREAM_QUEUE,
ARB_QUEUE, ARB_QUEUE,
STREAM_CTRL_QUEUE,
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;

View File

@ -383,7 +383,7 @@ static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
return; return;
} }
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder = {0};
uDebug("create stable data:%p", metaRsp); uDebug("create stable data:%p", metaRsp);
// decode and process req // decode and process req
@ -1020,7 +1020,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
SVCreateStbReq req = {0}; SVCreateStbReq req = {0};
SDecoder coder; SDecoder coder = {0};
SMCreateStbReq pReq = {0}; SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
@ -2327,7 +2327,7 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
uError("invalid parameter in %s", __func__); uError("invalid parameter in %s", __func__);
return; return;
} }
SDecoder coder; SDecoder coder = {0};
SMqBatchMetaRsp rsp = {0}; SMqBatchMetaRsp rsp = {0};
int32_t code = 0; int32_t code = 0;
cJSON* pJson = NULL; cJSON* pJson = NULL;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tcol.h" #include "tcol.h"
#include "tcompression.h" #include "tcompression.h"
#include "tutil.h" #include "tutil.h"

View File

@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
const char *name; const char *name;
SQueryAutoQWorkerPool queryPool; SQueryAutoQWorkerPool queryPool;
SAutoQWorkerPool streamPool; SAutoQWorkerPool streamPool;
SWWorkerPool streamCtrlPool;
SWWorkerPool fetchPool; SWWorkerPool fetchPool;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
SSingleWorker mgmtMultiWorker; SSingleWorker mgmtMultiWorker;
@ -73,6 +74,7 @@ typedef struct {
SMultiWorker pApplyW; SMultiWorker pApplyW;
STaosQueue *pQueryQ; STaosQueue *pQueryQ;
STaosQueue *pStreamQ; STaosQueue *pStreamQ;
STaosQueue *pStreamCtrlQ;
STaosQueue *pFetchQ; STaosQueue *pFetchQ;
STaosQueue *pMultiMgmQ; STaosQueue *pMultiMgmQ;
} SVnodeObj; } SVnodeObj;
@ -134,6 +136,7 @@ int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -1006,12 +1006,12 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
@ -1022,7 +1022,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;

View File

@ -395,9 +395,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
tqNotifyClose(pVnode->pImpl->pTq); tqNotifyClose(pVnode->pImpl->pTq);
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ); dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId); dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
dInfo("vgId:%d, post close", pVnode->vgId); dInfo("vgId:%d, post close", pVnode->vgId);

View File

@ -137,6 +137,34 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) {
SVnodeObj *pVnode = pInfo->ahandle;
void *pItem = NULL;
int32_t code = 0;
while (1) {
if (taosGetQitem(pQall, &pItem) == 0) {
break;
}
SRpcMsg *pMsg = pItem;
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg);
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
terrno = code;
dGError("vgId:%d, msg:%p failed to process stream ctrl msg %s since %s", pVnode->vgId, pMsg,
TMSG_INFO(pMsg->msgType), tstrerror(code));
vmSendRsp(pMsg, code);
}
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
}
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
@ -245,6 +273,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamQ, pMsg); code = taosWriteQitem(pVnode->pStreamQ, pMsg);
break; break;
case STREAM_CTRL_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
break;
case FETCH_QUEUE: case FETCH_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
code = taosWriteQitem(pVnode->pFetchQ, pMsg); code = taosWriteQitem(pVnode->pFetchQ, pMsg);
@ -301,6 +333,8 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); } int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
@ -373,6 +407,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
case STREAM_QUEUE: case STREAM_QUEUE:
size = taosQueueItemSize(pVnode->pStreamQ); size = taosQueueItemSize(pVnode->pStreamQ);
break; break;
case STREAM_CTRL_QUEUE:
size = taosQueueItemSize(pVnode->pStreamCtrlQ);
default: default:
break; break;
} }
@ -417,9 +453,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue); pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue); pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL || if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) { pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|| pVnode->pStreamCtrlQ == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -435,15 +473,19 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ, dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
taosQueueGetThreadId(pVnode->pFetchQ)); taosQueueGetThreadId(pVnode->pFetchQ));
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ); dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
return 0; return 0;
} }
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ); tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
pVnode->pQueryQ = NULL; pVnode->pQueryQ = NULL;
pVnode->pStreamQ = NULL; pVnode->pStreamQ = NULL;
pVnode->pStreamCtrlQ = NULL;
pVnode->pFetchQ = NULL; pVnode->pFetchQ = NULL;
dDebug("vgId:%d, queue is freed", pVnode->vgId); dDebug("vgId:%d, queue is freed", pVnode->vgId);
} }
@ -463,6 +505,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
pStreamPool->ratio = tsRatioOfVnodeStreamThreads; pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code; if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
pStreamCtrlPool->name = "vnode-ctrl-stream";
pStreamCtrlPool->max = 1;
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
SWWorkerPool *pFPool = &pMgmt->fetchPool; SWWorkerPool *pFPool = &pMgmt->fetchPool;
pFPool->name = "vnode-fetch"; pFPool->name = "vnode-fetch";
pFPool->max = tsNumOfVnodeFetchThreads; pFPool->max = tsNumOfVnodeFetchThreads;
@ -494,6 +541,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
void vmStopWorker(SVnodeMgmt *pMgmt) { void vmStopWorker(SVnodeMgmt *pMgmt) {
tQueryAutoQWorkerCleanup(&pMgmt->queryPool); tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
tAutoQWorkerCleanup(&pMgmt->streamPool); tAutoQWorkerCleanup(&pMgmt->streamPool);
tWWorkerCleanup(&pMgmt->streamCtrlPool);
tWWorkerCleanup(&pMgmt->fetchPool); tWWorkerCleanup(&pMgmt->fetchPool);
dDebug("vnode workers are closed"); dDebug("vnode workers are closed");
} }

View File

@ -112,6 +112,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);

View File

@ -930,7 +930,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
} }
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) && pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) { !syncIsReadyForRead(pVnode->sync)) {
@ -941,14 +941,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK:
return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP:
return tqProcessTaskCheckRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE: case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: case TDMT_STREAM_RETRIEVE_RSP:
@ -963,8 +955,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg); return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP: case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg); return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP: case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS: case TDMT_VND_GET_STREAM_PROGRESS:
@ -977,6 +967,32 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
} }
} }
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in stream ctrl queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
switch (pMsg->msgType) {
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK:
return tqProcessTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP:
return tqProcessTaskCheckRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
}
}
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
if (code) { if (code) {

View File

@ -17,6 +17,7 @@ This manual is intended to give developers a comprehensive guidance to test TDen
> [!NOTE] > [!NOTE]
> - The commands and scripts below are verified on Linux (Ubuntu 18.04/20.04/22.04). > - The commands and scripts below are verified on Linux (Ubuntu 18.04/20.04/22.04).
> - [taos-connector-python](https://github.com/taosdata/taos-connector-python) is used by tests written in Python, which requires Python 3.7+.
> - The commands and steps described below are to run the tests on a single host. > - The commands and steps described below are to run the tests on a single host.
# 2. Prerequisites # 2. Prerequisites

View File

@ -0,0 +1,3 @@
select AVG(t0) ;
select TOP(c1,100) ;
select LAST_ROW(c1) ;

View File

@ -24,4 +24,3 @@ for i in `find tools/taosdump/native/ -name "*.py"`
((count=count+1)) ((count=count+1))
done done

View File

@ -63,7 +63,6 @@ docker run \
-v /root/.cos-local.1:/root/.cos-local.2 \ -v /root/.cos-local.1:/root/.cos-local.2 \
-v ${REP_REAL_PATH}/enterprise/contrib/grant-lib:${REP_DIR}/enterprise/contrib/grant-lib \ -v ${REP_REAL_PATH}/enterprise/contrib/grant-lib:${REP_DIR}/enterprise/contrib/grant-lib \
-v ${REP_REAL_PATH}/community/tools/taosadapter:${REP_DIR}/community/tools/taosadapter \ -v ${REP_REAL_PATH}/community/tools/taosadapter:${REP_DIR}/community/tools/taosadapter \
-v ${REP_REAL_PATH}/community/tools/taos-tools:${REP_DIR}/community/tools/taos-tools \
-v ${REP_REAL_PATH}/community/tools/taosws-rs:${REP_DIR}/community/tools/taosws-rs \ -v ${REP_REAL_PATH}/community/tools/taosws-rs:${REP_DIR}/community/tools/taosws-rs \
-v ${REP_REAL_PATH}/community/contrib/apr/:${REP_DIR}/community/contrib/apr \ -v ${REP_REAL_PATH}/community/contrib/apr/:${REP_DIR}/community/contrib/apr \
-v ${REP_REAL_PATH}/community/contrib/apr-util/:${REP_DIR}/community/contrib/apr-util \ -v ${REP_REAL_PATH}/community/contrib/apr-util/:${REP_DIR}/community/contrib/apr-util \
@ -80,7 +79,6 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/mxml/:${REP_DIR}/community/contrib/mxml \ -v ${REP_REAL_PATH}/community/contrib/mxml/:${REP_DIR}/community/contrib/mxml \
-v ${REP_REAL_PATH}/community/contrib/openssl/:${REP_DIR}/community/contrib/openssl \ -v ${REP_REAL_PATH}/community/contrib/openssl/:${REP_DIR}/community/contrib/openssl \
-v ${REP_REAL_PATH}/community/contrib/pcre2/:${REP_DIR}/community/contrib/pcre2 \ -v ${REP_REAL_PATH}/community/contrib/pcre2/:${REP_DIR}/community/contrib/pcre2 \
-v ${REP_REAL_PATH}/community/contrib/xml2/:${REP_DIR}/community/contrib/xml2 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/zstd/:${REP_DIR}/community/contrib/zstd \ -v ${REP_REAL_PATH}/community/contrib/zstd/:${REP_DIR}/community/contrib/zstd \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DCMAKE_EXPORT_COMPILE_COMMANDS=1 ;make -j|| exit 1" --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0 -DCMAKE_EXPORT_COMPILE_COMMANDS=1 ;make -j|| exit 1"
@ -116,7 +114,6 @@ docker run \
-v /root/.cos-local.1:/root/.cos-local.2 \ -v /root/.cos-local.1:/root/.cos-local.2 \
-v ${REP_REAL_PATH}/enterprise/contrib/grant-lib:${REP_DIR}/enterprise/contrib/grant-lib \ -v ${REP_REAL_PATH}/enterprise/contrib/grant-lib:${REP_DIR}/enterprise/contrib/grant-lib \
-v ${REP_REAL_PATH}/community/tools/taosadapter:${REP_DIR}/community/tools/taosadapter \ -v ${REP_REAL_PATH}/community/tools/taosadapter:${REP_DIR}/community/tools/taosadapter \
-v ${REP_REAL_PATH}/community/tools/taos-tools:${REP_DIR}/community/tools/taos-tools \
-v ${REP_REAL_PATH}/community/tools/taosws-rs:${REP_DIR}/community/tools/taosws-rs \ -v ${REP_REAL_PATH}/community/tools/taosws-rs:${REP_DIR}/community/tools/taosws-rs \
-v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \ -v ${REP_REAL_PATH}/community/tools/taosws-rs/target:${REP_DIR}/community/tools/taosws-rs/target \
-v ${REP_REAL_PATH}/community/contrib/apr/:${REP_DIR}/community/contrib/apr \ -v ${REP_REAL_PATH}/community/contrib/apr/:${REP_DIR}/community/contrib/apr \
@ -134,7 +131,6 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/mxml/:${REP_DIR}/community/contrib/mxml \ -v ${REP_REAL_PATH}/community/contrib/mxml/:${REP_DIR}/community/contrib/mxml \
-v ${REP_REAL_PATH}/community/contrib/openssl/:${REP_DIR}/community/contrib/openssl \ -v ${REP_REAL_PATH}/community/contrib/openssl/:${REP_DIR}/community/contrib/openssl \
-v ${REP_REAL_PATH}/community/contrib/pcre2/:${REP_DIR}/community/contrib/pcre2 \ -v ${REP_REAL_PATH}/community/contrib/pcre2/:${REP_DIR}/community/contrib/pcre2 \
-v ${REP_REAL_PATH}/community/contrib/xml2/:${REP_DIR}/community/contrib/xml2 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \ -v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/zstd/:${REP_DIR}/community/contrib/zstd \ -v ${REP_REAL_PATH}/community/contrib/zstd/:${REP_DIR}/community/contrib/zstd \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=false -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DCMAKE_BUILD_TYPE=Debug -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j|| exit 1 " --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=false -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DCMAKE_BUILD_TYPE=Debug -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j|| exit 1 "

View File

@ -16,6 +16,10 @@ IF(TD_WEBSOCKET)
DEPENDS ${TAOS_LIB} DEPENDS ${TAOS_LIB}
BUILD_IN_SOURCE 1 BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
UPDATE_COMMAND
COMMAND echo "Starting to switch branch"
COMMAND git fetch origin main && git checkout -f origin/main
COMMAND echo "Switched to the main branch successfully"
PATCH_COMMAND PATCH_COMMAND
COMMAND git clean -f -d COMMAND git clean -f -d
BUILD_COMMAND BUILD_COMMAND
@ -35,6 +39,10 @@ IF(TD_WEBSOCKET)
DEPENDS ${TAOS_LIB} DEPENDS ${TAOS_LIB}
BUILD_IN_SOURCE 1 BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
UPDATE_COMMAND
COMMAND echo "Starting to switch branch"
COMMAND git fetch origin main && git checkout -f origin/main
COMMAND echo "Switched to the main branch successfully"
PATCH_COMMAND PATCH_COMMAND
COMMAND git clean -f -d COMMAND git clean -f -d
BUILD_COMMAND BUILD_COMMAND
@ -55,6 +63,10 @@ IF(TD_WEBSOCKET)
DEPENDS ${TAOS_LIB} DEPENDS ${TAOS_LIB}
BUILD_IN_SOURCE 1 BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config" CONFIGURE_COMMAND cmake -E echo "taosws-rs no need cmake to config"
UPDATE_COMMAND
COMMAND echo "Starting to switch branch"
COMMAND git fetch origin main && git checkout -f origin/main
COMMAND echo "Switched to the main branch successfully"
PATCH_COMMAND PATCH_COMMAND
COMMAND git clean -f -d COMMAND git clean -f -d
BUILD_COMMAND BUILD_COMMAND

View File

@ -332,7 +332,7 @@ func (r *Reporter) handlerFunc() gin.HandlerFunc {
logger.Tracef("report data:%s", string(data)) logger.Tracef("report data:%s", string(data))
if e := json.Unmarshal(data, &report); e != nil { if e := json.Unmarshal(data, &report); e != nil {
logger.Errorf("error occurred while unmarshal request, data:%s, error:%s", data, err) logger.Errorf("error occurred while unmarshal request, data:%s, error:%v", data, e)
return return
} }
var sqls []string var sqls []string

View File

@ -58,7 +58,7 @@ func NewCommand(conf *config.Config) *Command {
panic(err) panic(err)
} }
imp := &Command{ return &Command{
client: client, client: client,
conn: conn, conn: conn,
username: conf.TDengine.Username, username: conf.TDengine.Username,
@ -70,7 +70,6 @@ func NewCommand(conf *config.Config) *Command {
RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name), RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name),
}, },
} }
return imp
} }
func (cmd *Command) Process(conf *config.Config) { func (cmd *Command) Process(conf *config.Config) {
@ -101,7 +100,7 @@ func (cmd *Command) Process(conf *config.Config) {
} }
func (cmd *Command) ProcessTransfer(conf *config.Config) { func (cmd *Command) ProcessTransfer(conf *config.Config) {
fromTime, err := time.Parse("2006-01-02T15:04:05Z07:00", conf.FromTime) fromTime, err := time.Parse(time.RFC3339, conf.FromTime)
if err != nil { if err != nil {
logger.Errorf("parse fromTime error, msg:%s", err) logger.Errorf("parse fromTime error, msg:%s", err)
return return
@ -156,6 +155,7 @@ func (cmd *Command) TransferTaosdDnodesInfo() error {
dstTable := "taosd_dnodes_info" dstTable := "taosd_dnodes_info"
return cmd.TransferTableToDst(sql, dstTable, 3) return cmd.TransferTableToDst(sql, dstTable, 3)
} }
func (cmd *Command) TransferTaosdDnodesStatus() error { func (cmd *Command) TransferTaosdDnodesStatus() error {
sql := "select cluster_id, dnode_id, dnode_ep, CASE status WHEN 'ready' THEN 1 ELSE 0 END as status, ts from d_info a where " sql := "select cluster_id, dnode_id, dnode_ep, CASE status WHEN 'ready' THEN 1 ELSE 0 END as status, ts from d_info a where "
dstTable := "taosd_dnodes_status" dstTable := "taosd_dnodes_status"
@ -167,6 +167,7 @@ func (cmd *Command) TransferTaosdDnodesLogDirs1() error {
dstTable := "taosd_dnodes_log_dirs" dstTable := "taosd_dnodes_log_dirs"
return cmd.TransferTableToDst(sql, dstTable, 4) return cmd.TransferTableToDst(sql, dstTable, 4)
} }
func (cmd *Command) TransferTaosdDnodesLogDirs2() error { func (cmd *Command) TransferTaosdDnodesLogDirs2() error {
sql := "select cluster_id, dnode_id, dnode_ep, name as log_dir_name, avail, used, total, ts from temp_dir a where " sql := "select cluster_id, dnode_id, dnode_ep, name as log_dir_name, avail, used, total, ts from temp_dir a where "
dstTable := "taosd_dnodes_log_dirs" dstTable := "taosd_dnodes_log_dirs"
@ -224,13 +225,12 @@ func (cmd *Command) ProcessDrop(conf *config.Config) {
} }
func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum int) { func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum int) {
var buf bytes.Buffer
if len(data.Data) < 1 { if len(data.Data) < 1 {
return return
} }
var buf bytes.Buffer
for _, row := range data.Data { for _, row := range data.Data {
// get one row here // get one row here
buf.WriteString(dstTable) buf.WriteString(dstTable)
@ -262,7 +262,6 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in
// write metrics // write metrics
for j := tagNum; j < len(row)-1; j++ { for j := tagNum; j < len(row)-1; j++ {
switch v := row[j].(type) { switch v := row[j].(type) {
case int: case int:
buf.WriteString(fmt.Sprintf("%s=%ff64", data.Head[j], float64(v))) buf.WriteString(fmt.Sprintf("%s=%ff64", data.Head[j], float64(v)))
@ -292,8 +291,7 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in
if logger.Logger.IsLevelEnabled(logrus.TraceLevel) { if logger.Logger.IsLevelEnabled(logrus.TraceLevel) {
logger.Tracef("buf:%v", buf.String()) logger.Tracef("buf:%v", buf.String())
} }
err := cmd.lineWriteBody(&buf) if err := cmd.lineWriteBody(&buf); err != nil {
if err != nil {
logger.Errorf("insert data error, msg:%s", err) logger.Errorf("insert data error, msg:%s", err)
panic(err) panic(err)
} }
@ -305,8 +303,7 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in
if logger.Logger.IsLevelEnabled(logrus.TraceLevel) { if logger.Logger.IsLevelEnabled(logrus.TraceLevel) {
logger.Tracef("buf:%v", buf.String()) logger.Tracef("buf:%v", buf.String())
} }
err := cmd.lineWriteBody(&buf) if err := cmd.lineWriteBody(&buf); err != nil {
if err != nil {
logger.Errorf("insert data error, msg:%s", err) logger.Errorf("insert data error, msg:%s", err)
panic(err) panic(err)
} }
@ -401,7 +398,6 @@ func (cmd *Command) TransferTaosdClusterBasicInfo() error {
// cluster_info // cluster_info
func (cmd *Command) TransferTableToDst(sql string, dstTable string, tagNum int) error { func (cmd *Command) TransferTableToDst(sql string, dstTable string, tagNum int) error {
ctx := context.Background() ctx := context.Background()
endTime := time.Now() endTime := time.Now()
@ -445,13 +441,12 @@ func (cmd *Command) lineWriteBody(buf *bytes.Buffer) error {
req.Body = io.NopCloser(buf) req.Body = io.NopCloser(buf)
resp, err := cmd.client.Do(req) resp, err := cmd.client.Do(req)
if err != nil { if err != nil {
logger.Errorf("writing metrics exception, msg:%s", err) logger.Errorf("writing metrics exception, msg:%s", err)
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent { if resp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected status code %d:body:%s", resp.StatusCode, string(body)) return fmt.Errorf("unexpected status code %d:body:%s", resp.StatusCode, string(body))

View File

@ -112,8 +112,7 @@ var once sync.Once
func ConfigLog() { func ConfigLog() {
once.Do(func() { once.Do(func() {
err := SetLevel(config.Conf.LogLevel) if err := SetLevel(config.Conf.LogLevel); err != nil {
if err != nil {
panic(err) panic(err)
} }
writer, err := rotatelogs.New( writer, err := rotatelogs.New(

View File

@ -41,33 +41,31 @@ func (s *sysMonitor) collect() {
} }
s.Lock() s.Lock()
defer s.Unlock()
for output := range s.outputs { for output := range s.outputs {
select { select {
case output <- *s.status: case output <- *s.status:
default: default:
} }
} }
s.Unlock()
} }
func (s *sysMonitor) Register(c chan<- SysStatus) { func (s *sysMonitor) Register(c chan<- SysStatus) {
s.Lock() s.Lock()
defer s.Unlock()
if s.outputs == nil { if s.outputs == nil {
s.outputs = map[chan<- SysStatus]struct{}{ s.outputs = map[chan<- SysStatus]struct{}{}
c: {},
}
} else {
s.outputs[c] = struct{}{}
} }
s.Unlock() s.outputs[c] = struct{}{}
} }
func (s *sysMonitor) Deregister(c chan<- SysStatus) { func (s *sysMonitor) Deregister(c chan<- SysStatus) {
s.Lock() s.Lock()
defer s.Unlock()
if s.outputs != nil { if s.outputs != nil {
delete(s.outputs, c) delete(s.outputs, c)
} }
s.Unlock()
} }
var SysMonitor = &sysMonitor{status: &SysStatus{}} var SysMonitor = &sysMonitor{status: &SysStatus{}}

View File

@ -60,7 +60,7 @@ func EscapeInfluxProtocol(s string) string {
} }
func GetCfg() *config.Config { func GetCfg() *config.Config {
c := &config.Config{ return &config.Config{
InstanceID: 64, InstanceID: 64,
Port: 6043, Port: 6043,
LogLevel: "trace", LogLevel: "trace",
@ -87,7 +87,6 @@ func GetCfg() *config.Config {
ReservedDiskSize: 1073741824, ReservedDiskSize: 1073741824,
}, },
} }
return c
} }
func SafeSubstring(s string, n int) string { func SafeSubstring(s string, n int) string {
@ -123,8 +122,7 @@ func GetQidOwn() uint64 {
atomic.StoreUint64(&globalCounter64, 1) atomic.StoreUint64(&globalCounter64, 1)
id = 1 id = 1
} }
qid64 := uint64(config.Conf.InstanceID)<<56 | id return uint64(config.Conf.InstanceID)<<56 | id
return qid64
} }
func GetMd5HexStr(str string) string { func GetMd5HexStr(str string) string {
@ -138,7 +136,6 @@ func isValidChar(r rune) bool {
func ToValidTableName(input string) string { func ToValidTableName(input string) string {
var builder strings.Builder var builder strings.Builder
for _, r := range input { for _, r := range input {
if isValidChar(r) { if isValidChar(r) {
builder.WriteRune(unicode.ToLower(r)) builder.WriteRune(unicode.ToLower(r))
@ -146,7 +143,5 @@ func ToValidTableName(input string) string {
builder.WriteRune('_') builder.WriteRune('_')
} }
} }
return builder.String()
result := builder.String()
return result
} }