From 553b57f268aa34d1d813dae6de85da80f06a898e Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Tue, 1 Oct 2024 10:33:00 +0800 Subject: [PATCH 1/3] mod java code comments --- .../JDBC/SpringJdbcTemplate/readme.md | 16 ++--- .../src/main/resources/log4j.properties | 10 ++-- .../src/main/resources/proxool.xml | 21 +++---- .../src/main/java/com/taosdata/Worker.java | 9 +-- docs/examples/JDBC/mybatisplus-demo/readme | 8 +-- docs/examples/JDBC/springbootdemo/readme.md | 14 ++--- docs/examples/JDBC/taosdemo/readme.md | 7 ++- .../taosdemo/TaosDemoApplication.java | 15 +++-- .../taosdata/taosdemo/dao/SubTableMapper.java | 15 ++--- .../taosdemo/dao/SuperTableMapper.java | 22 +++---- .../taosdata/taosdemo/dao/TableMapper.java | 13 ++-- .../taosdemo/service/DatabaseService.java | 4 +- .../taosdemo/service/SubTableService.java | 60 ++++++++++--------- .../taosdemo/service/SuperTableService.java | 2 +- .../taosdemo/service/TableService.java | 5 +- .../service/data/FieldValueGenerator.java | 10 +++- .../service/data/SubTableMetaGenerator.java | 2 +- .../service/data/SuperTableMetaGenerator.java | 13 ++-- .../service/data/TagValueGenerator.java | 2 +- .../taosdemo/utils/TimeStampUtil.java | 6 +- .../src/main/resources/log4j.properties | 10 ++-- 21 files changed, 142 insertions(+), 122 deletions(-) diff --git a/docs/examples/JDBC/SpringJdbcTemplate/readme.md b/docs/examples/JDBC/SpringJdbcTemplate/readme.md index f59bcdbeb5..0e9812385a 100644 --- a/docs/examples/JDBC/SpringJdbcTemplate/readme.md +++ b/docs/examples/JDBC/SpringJdbcTemplate/readme.md @@ -1,11 +1,11 @@ ## TDengine Spring JDBC Template Demo -`Spring JDBC Template` 简化了原生 JDBC Connection 获取释放等操作,使得操作数据库更加方便。 +`Spring JDBC Template` simplifies the operations of acquiring and releasing native JDBC Connections, making database operations more convenient. -### 配置 +### Configuration -修改 `src/main/resources/applicationContext.xml` 文件中 TDengine 的配置信息: +Modify the TDengine configuration in the `src/main/resources/applicationContext.xml` file: ```xml @@ -20,13 +20,15 @@ ``` -### 打包运行 +### Package and run + +Navigate to the `TDengine/tests/examples/JDBC/SpringJdbcTemplate` directory and execute the following commands to generate an executable jar file. -进入 `TDengine/tests/examples/JDBC/SpringJdbcTemplate` 目录下,执行以下命令可以生成可执行 jar 包。 ```shell mvn clean package ``` -打包成功之后,进入 `target/` 目录下,执行以下命令就可运行测试: +After successfully packaging, navigate to the `target/` directory and execute the following commands to run the tests: + ```shell java -jar target/SpringJdbcTemplate-1.0-SNAPSHOT-jar-with-dependencies.jar -``` \ No newline at end of file +``` diff --git a/docs/examples/JDBC/connectionPools/src/main/resources/log4j.properties b/docs/examples/JDBC/connectionPools/src/main/resources/log4j.properties index 1299357be3..a7f4d3d492 100644 --- a/docs/examples/JDBC/connectionPools/src/main/resources/log4j.properties +++ b/docs/examples/JDBC/connectionPools/src/main/resources/log4j.properties @@ -1,21 +1,21 @@ -### 设置### +### Settings### log4j.rootLogger=debug,stdout,DebugLog,ErrorLog -### 输出信息到控制抬 ### +### Output information to the console ### log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n -### 输出DEBUG 级别以上的日志到=logs/debug.log +### Output logs of DEBUG level and above to logs/debug.log log4j.appender.DebugLog=org.apache.log4j.DailyRollingFileAppender log4j.appender.DebugLog.File=logs/debug.log log4j.appender.DebugLog.Append=true log4j.appender.DebugLog.Threshold=DEBUG log4j.appender.DebugLog.layout=org.apache.log4j.PatternLayout log4j.appender.DebugLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n -### 输出ERROR 级别以上的日志到=logs/error.log +### Output logs of ERROR level and above to logs/error.log log4j.appender.ErrorLog=org.apache.log4j.DailyRollingFileAppender log4j.appender.ErrorLog.File=logs/error.log log4j.appender.ErrorLog.Append=true log4j.appender.ErrorLog.Threshold=ERROR log4j.appender.ErrorLog.layout=org.apache.log4j.PatternLayout -log4j.appender.ErrorLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n \ No newline at end of file +log4j.appender.ErrorLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n diff --git a/docs/examples/JDBC/connectionPools/src/main/resources/proxool.xml b/docs/examples/JDBC/connectionPools/src/main/resources/proxool.xml index 0e2ac6368a..dcef51ffb5 100644 --- a/docs/examples/JDBC/connectionPools/src/main/resources/proxool.xml +++ b/docs/examples/JDBC/connectionPools/src/main/resources/proxool.xml @@ -1,27 +1,28 @@ + ds - + jdbc:TAOS-RS://127.0.0.1:6041/log - + com.taosdata.jdbc.rs.RestfulDriver - + - + 100 - + 100 - + 1 - + 5 - + 30000 - + select server_version() - \ No newline at end of file + diff --git a/docs/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java b/docs/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java index f6e21cd729..6823ca5b4d 100644 --- a/docs/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java +++ b/docs/examples/JDBC/consumer-demo/src/main/java/com/taosdata/Worker.java @@ -35,17 +35,18 @@ public class Worker implements Runnable { public void run() { while (!Thread.interrupted()) { try { - // 控制请求频率 + // Control request rate if (semaphore.tryAcquire()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(sleepTime)); pool.submit(() -> { RateLimiter limiter = RateLimiter.create(rate); try { for (ConsumerRecord record : records) { - // 流量控制 + // Traffic control limiter.acquire(); - // 业务处理数据 - System.out.println("[" + LocalDateTime.now() + "] Thread id:" + Thread.currentThread().getId() + " -> " + record.value()); + // Business data processing + System.out.println("[" + LocalDateTime.now() + "] Thread id:" + + Thread.currentThread().getId() + " -> " + record.value()); } } finally { semaphore.release(); diff --git a/docs/examples/JDBC/mybatisplus-demo/readme b/docs/examples/JDBC/mybatisplus-demo/readme index b31b6c34bf..a4816d7631 100644 --- a/docs/examples/JDBC/mybatisplus-demo/readme +++ b/docs/examples/JDBC/mybatisplus-demo/readme @@ -1,14 +1,14 @@ -# 使用说明 +# Instructions -## 创建使用db +## Create and use the database ```shell $ taos > create database mp_test ``` -## 执行测试用例 +## Execute test cases ```shell $ mvn clean test -``` \ No newline at end of file +``` diff --git a/docs/examples/JDBC/springbootdemo/readme.md b/docs/examples/JDBC/springbootdemo/readme.md index 625d43e4ed..d11bb33c83 100644 --- a/docs/examples/JDBC/springbootdemo/readme.md +++ b/docs/examples/JDBC/springbootdemo/readme.md @@ -1,6 +1,6 @@ ## TDengine SpringBoot + Mybatis Demo -## 需要提前创建 test 数据库 +## Need to create a test database in advance ``` $ taos -s 'create database if not exists test' @@ -8,7 +8,7 @@ $ taos -s 'create database if not exists test' $ curl http://localhost:8080/weather/init ``` -### 配置 application.properties +### Configure application.properties ```properties # datasource config spring.datasource.driver-class-name=com.taosdata.jdbc.TSDBDriver @@ -38,9 +38,9 @@ mybatis.mapper-locations=classpath:mapper/*.xml logging.level.com.taosdata.jdbc.springbootdemo.dao=debug ``` -### 主要功能 +### Main functions -* 创建数据库和表 +* Create databases and tables ```xml @@ -52,14 +52,14 @@ logging.level.com.taosdata.jdbc.springbootdemo.dao=debug ``` -* 插入单条记录 +* Insert a single record ```xml insert into test.weather (ts, temperature, humidity) values (now, #{temperature,jdbcType=INTEGER}, #{humidity,jdbcType=FLOAT}) ``` -* 插入多条记录 +* Insert multiple records ```xml @@ -69,7 +69,7 @@ logging.level.com.taosdata.jdbc.springbootdemo.dao=debug ``` -* 分页查询 +* Pagination query ```xml diff --git a/docs/examples/JDBC/taosdemo/readme.md b/docs/examples/JDBC/taosdemo/readme.md index 986eef8a05..141391d1f6 100644 --- a/docs/examples/JDBC/taosdemo/readme.md +++ b/docs/examples/JDBC/taosdemo/readme.md @@ -1,11 +1,14 @@ ``` cd tests/examples/JDBC/taosdemo mvn clean package -Dmaven.test.skip=true -# 先建表,再插入的 +# Create tables first, then insert data java -jar target/taosdemo-2.0.1-jar-with-dependencies.jar -host -database -doCreateTable true -superTableSQL "create table weather(ts timestamp, f1 int) tags(t1 nchar(4))" -numOfTables 1000 -numOfRowsPerTable 100000000 -numOfThreadsForInsert 10 -numOfTablesPerSQL 10 -numOfValuesPerSQL 100 -# 不建表,直接插入的 +# Insert data directly without creating tables java -jar target/taosdemo-2.0.1-jar-with-dependencies.jar -host -database -doCreateTable false -superTableSQL "create table weather(ts timestamp, f1 int) tags(t1 nchar(4))" -numOfTables 1000 -numOfRowsPerTable 100000000 -numOfThreadsForInsert 10 -numOfTablesPerSQL 10 -numOfValuesPerSQL 100 ``` 如果发生错误 Exception in thread "main" java.lang.UnsatisfiedLinkError: no taos in java.library.path 请检查是否安装 TDengine 客户端安装包或编译 TDengine 安装。如果确定已经安装过还出现这个错误,可以在命令行 java 后加 -Djava.library.path=/usr/lib 来指定寻找共享库的路径。 + + +If you encounter the error Exception in thread "main" `java.lang.UnsatisfiedLinkError: no taos in java.library.path`, please check whether the TDengine client package is installed or TDengine is compiled and installed. If you are sure it is installed and still encounter this error, you can add `-Djava.library.path=/usr/lib` after the `java` command to specify the path to the shared library. diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java index 6854054703..40d44475b8 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/TaosDemoApplication.java @@ -24,14 +24,14 @@ public class TaosDemoApplication { private static final Logger logger = LogManager.getLogger(TaosDemoApplication.class); public static void main(String[] args) throws IOException { - // 读配置参数 + // Read configuration parameters JdbcTaosdemoConfig config = new JdbcTaosdemoConfig(args); boolean isHelp = Arrays.asList(args).contains("--help"); if (isHelp || config.host == null || config.host.isEmpty()) { JdbcTaosdemoConfig.printHelp(); System.exit(0); } - // 初始化 + // final DataSource dataSource = DataSourceFactory.getInstance(config.host, config.port, config.user, config.password); if (config.executeSql != null && !config.executeSql.isEmpty() @@ -50,7 +50,7 @@ public class TaosDemoApplication { final SuperTableService superTableService = new SuperTableService(dataSource); final SubTableService subTableService = new SubTableService(dataSource); - // 创建数据库 + // create database long start = System.currentTimeMillis(); Map databaseParam = new HashMap<>(); databaseParam.put("database", config.database); @@ -81,13 +81,13 @@ public class TaosDemoApplication { config.prefixOfFields, config.numOfTags, config.prefixOfTags); } /**********************************************************************************/ - // 建表 + // create table start = System.currentTimeMillis(); if (config.doCreateTable) { superTableService.drop(superTableMeta.getDatabase(), superTableMeta.getName()); superTableService.create(superTableMeta); if (!config.autoCreateTable) { - // 批量建子表 + // create sub tables in batch subTableService.createSubTable(superTableMeta, config.numOfTables, config.prefixOfTable, config.numOfThreadsForCreate); } @@ -95,7 +95,7 @@ public class TaosDemoApplication { end = System.currentTimeMillis(); logger.info(">>> create table time cost : " + (end - start) + " ms."); /**********************************************************************************/ - // 插入 + // insert data long tableSize = config.numOfTables; int threadSize = config.numOfThreadsForInsert; long startTime = getProperStartTime(config.startTime, config.days); @@ -111,10 +111,9 @@ public class TaosDemoApplication { end = System.currentTimeMillis(); logger.info("insert " + affectedRows + " rows, time cost: " + (end - start) + " ms"); /**********************************************************************************/ - // 查询 /**********************************************************************************/ - // 删除表 + // drop table if (config.dropTable) { superTableService.drop(config.database, config.superTable); } diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SubTableMapper.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SubTableMapper.java index e0ddd220c1..013d24eb87 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SubTableMapper.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SubTableMapper.java @@ -9,21 +9,22 @@ import java.util.List; @Repository public interface SubTableMapper { - // 创建:子表 + // Create: SubTable void createUsingSuperTable(SubTableMeta subTableMeta); - // 插入:一张子表多个values + // Insert: Multiple records into one SubTable int insertOneTableMultiValues(SubTableValue subTableValue); - // 插入:一张子表多个values, 自动建表 + // Insert: Multiple records into one SubTable, auto create SubTables int insertOneTableMultiValuesUsingSuperTable(SubTableValue subTableValue); - // 插入:多张表多个values + // Insert: Multiple records into multiple SubTable int insertMultiTableMultiValues(List tables); - // 插入:多张表多个values,自动建表 + // Insert: Multiple records into multiple SubTable, auto create SubTables int insertMultiTableMultiValuesUsingSuperTable(List tables); - // + // -} \ No newline at end of file +} diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SuperTableMapper.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SuperTableMapper.java index 9f8cec9e8f..15cafd04fb 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SuperTableMapper.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/SuperTableMapper.java @@ -6,24 +6,26 @@ import org.springframework.stereotype.Repository; @Repository public interface SuperTableMapper { - // 创建超级表 create table if not exists xxx.xxx (f1 type1, f2 type2, ... ) tags( t1 type1, t2 type2 ...) + // Create super table: create table if not exists xxx.xxx (f1 type1, f2 type2, + // ... ) tags( t1 type1, t2 type2 ...) void createSuperTable(SuperTableMeta tableMetadata); - // 删除超级表 drop table if exists xxx; + // Drop super table: drop table if exists xxx; void dropSuperTable(String database, String name); - // + // - // + // - // + // - // + // - // + // - // - - // + // + // } diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/TableMapper.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/TableMapper.java index 32d1875e4d..c0f75d2204 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/TableMapper.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/dao/TableMapper.java @@ -9,19 +9,18 @@ import java.util.List; @Repository public interface TableMapper { - // 创建:普通表 + // Create: Normal table void create(TableMeta tableMeta); - // 插入:一张表多个value + // Insert: Multiple records into one table int insertOneTableMultiValues(TableValue values); - // 插入: 一张表多个value,指定的列 + // Insert: Multiple records into one table, specified columns int insertOneTableMultiValuesWithColumns(TableValue values); - // 插入:多个表多个value + // Insert: Multiple records into multiple tables int insertMultiTableMultiValues(List tables); - // 插入:多个表多个value, 指定的列 + // Insert: Multiple records into multiple tables, specified columns int insertMultiTableMultiValuesWithColumns(List tables); - -} \ No newline at end of file +} diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/DatabaseService.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/DatabaseService.java index 3c8e962406..68ddd78323 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/DatabaseService.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/DatabaseService.java @@ -14,12 +14,12 @@ public class DatabaseService { this.databaseMapper = new DatabaseMapperImpl(dataSource); } - // 建库,指定 name + // Create database with specified name public void createDatabase(String database) { databaseMapper.createDatabase(database); } - // 建库,指定参数 keep,days,replica等 + // Create database with specified parameters such as keep, days, replica, etc. public void createDatabase(Map map) { if (map.isEmpty()) return; diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java index b0a79dea78..690b02f065 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SubTableService.java @@ -27,7 +27,8 @@ public class SubTableService extends AbstractService { this.mapper = new SubTableMapperImpl(datasource); } - public void createSubTable(SuperTableMeta superTableMeta, long numOfTables, String prefixOfTable, int numOfThreadsForCreate) { + public void createSubTable(SuperTableMeta superTableMeta, long numOfTables, String prefixOfTable, + int numOfThreadsForCreate) { ExecutorService executor = Executors.newFixedThreadPool(numOfThreadsForCreate); for (long i = 0; i < numOfTables; i++) { long tableIndex = i; @@ -35,54 +36,58 @@ public class SubTableService extends AbstractService { } executor.shutdown(); try { - executor.awaitTermination(Long.MAX_VALUE,TimeUnit.NANOSECONDS); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } public void createSubTable(SuperTableMeta superTableMeta, String tableName) { - // 构造数据 + // Construct data SubTableMeta meta = SubTableMetaGenerator.generate(superTableMeta, tableName); createSubTable(meta); } - // 创建一张子表,可以指定database,supertable,tablename,tag值 + // Create a sub-table, specifying database, super table, table name, and tag + // values public void createSubTable(SubTableMeta subTableMeta) { mapper.createUsingSuperTable(subTableMeta); } /*************************************************************************************************************************/ - // 插入:多线程,多表 + // Insert: Multi-threaded, multiple tables public int insert(List subTableValues, int threadSize, int frequency) { ExecutorService executor = Executors.newFixedThreadPool(threadSize); Future future = executor.submit(() -> insert(subTableValues)); executor.shutdown(); - //TODO:frequency + // TODO:frequency return getAffectRows(future); } - // 插入:单表,insert into xxx values(),()... + // Insert: Single table, insert into xxx values(),()... public int insert(SubTableValue subTableValue) { return mapper.insertOneTableMultiValues(subTableValue); } - // 插入: 多表,insert into xxx values(),()... xxx values(),()... + // Insert: Multiple tables, insert into xxx values(),()... xxx values(),()... public int insert(List subTableValues) { return mapper.insertMultiTableMultiValues(subTableValues); } - // 插入:单表,自动建表, insert into xxx using xxx tags(...) values(),()... + // Insert: Single table, auto-create table, insert into xxx using xxx tags(...) + // values(),()... public int insertAutoCreateTable(SubTableValue subTableValue) { return mapper.insertOneTableMultiValuesUsingSuperTable(subTableValue); } - // 插入:多表,自动建表, insert into xxx using XXX tags(...) values(),()... xxx using XXX tags(...) values(),()... + // Insert: Multiple tables, auto-create tables, insert into xxx using XXX + // tags(...) values(),()... xxx using XXX tags(...) values(),()... public int insertAutoCreateTable(List subTableValues) { return mapper.insertMultiTableMultiValuesUsingSuperTable(subTableValues); } - public int insertMultiThreads(SuperTableMeta superTableMeta, int threadSize, long tableSize, long startTime, long gap, JdbcTaosdemoConfig config) { + public int insertMultiThreads(SuperTableMeta superTableMeta, int threadSize, long tableSize, long startTime, + long gap, JdbcTaosdemoConfig config) { List taskList = new ArrayList<>(); List threads = IntStream.range(0, threadSize) .mapToObj(i -> { @@ -94,8 +99,7 @@ public class SubTableService extends AbstractService { startTime, config.timeGap, config.numOfRowsPerTable, config.numOfTablesPerSQL, config.numOfValuesPerSQL, config.order, config.rate, config.range, - config.prefixOfTable, config.autoCreateTable) - ); + config.prefixOfTable, config.autoCreateTable)); taskList.add(task); return new Thread(task, "InsertThread-" + i); }).collect(Collectors.toList()); @@ -126,7 +130,7 @@ public class SubTableService extends AbstractService { private class InsertTask implements Callable { private final long startTableInd; // included - private final long endTableInd; // excluded + private final long endTableInd; // excluded private final long startTime; private final long timeGap; private final long numOfRowsPerTable; @@ -140,10 +144,10 @@ public class SubTableService extends AbstractService { private final boolean autoCreateTable; public InsertTask(SuperTableMeta superTableMeta, long startTableInd, long endTableInd, - long startTime, long timeGap, - long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL, - int order, int rate, long range, - String prefixOfTable, boolean autoCreateTable) { + long startTime, long timeGap, + long numOfRowsPerTable, long numOfTablesPerSQL, long numOfValuesPerSQL, + int order, int rate, long range, + String prefixOfTable, boolean autoCreateTable) { this.superTableMeta = superTableMeta; this.startTableInd = startTableInd; this.endTableInd = endTableInd; @@ -159,7 +163,6 @@ public class SubTableService extends AbstractService { this.autoCreateTable = autoCreateTable; } - @Override public Integer call() { @@ -171,23 +174,27 @@ public class SubTableService extends AbstractService { int affectRows = 0; // row - for (long rowCnt = 0; rowCnt < numOfRowsPerTable; ) { + for (long rowCnt = 0; rowCnt < numOfRowsPerTable;) { long rowSize = numOfValuesPerSQL; if (rowCnt + rowSize > numOfRowsPerTable) { rowSize = numOfRowsPerTable - rowCnt; } - //table - for (long tableCnt = startTableInd; tableCnt < endTableInd; ) { + // table + for (long tableCnt = startTableInd; tableCnt < endTableInd;) { long tableSize = numOfTablesPerSQL; if (tableCnt + tableSize > endTableInd) { tableSize = endTableInd - tableCnt; } long startTime = this.startTime + rowCnt * timeGap; -// System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + timeGap + ""); + // System.out.println(Thread.currentThread().getName() + " >>> " + "rowCnt: " + + // rowCnt + ", rowSize: " + rowSize + ", " + "tableCnt: " + tableCnt + + // ",tableSize: " + tableSize + ", " + "startTime: " + startTime + ",timeGap: " + // + timeGap + ""); /***********************************************/ - // 生成数据 - List data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, tableSize, rowSize, startTime, timeGap); - // 乱序 + // Construct data + List data = SubTableValueGenerator.generate(superTableMeta, prefixOfTable, tableCnt, + tableSize, rowSize, startTime, timeGap); + // disorder if (order != 0) SubTableValueGenerator.disrupt(data, rate, range); // insert @@ -205,5 +212,4 @@ public class SubTableService extends AbstractService { } } - } diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SuperTableService.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SuperTableService.java index b91348e2d0..47798e0c4e 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SuperTableService.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/SuperTableService.java @@ -14,7 +14,7 @@ public class SuperTableService { this.superTableMapper = new SuperTableMapperImpl(dataSource); } - // 创建超级表,指定每个field的名称和类型,每个tag的名称和类型 + // Create super table, specifying the name and type of each field and each tag public void create(SuperTableMeta superTableMeta) { superTableMapper.createSuperTable(superTableMeta); } diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/TableService.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/TableService.java index 2504fdb0b4..2bfb963b4a 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/TableService.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/TableService.java @@ -11,15 +11,14 @@ public class TableService extends AbstractService { private TableMapper tableMapper; - //创建一张表 + // Create a table public void create(TableMeta tableMeta) { tableMapper.create(tableMeta); } - //创建多张表 + // Create multiple tables public void create(List tables) { tables.stream().forEach(this::create); } - } diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/FieldValueGenerator.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/FieldValueGenerator.java index 73cd981a46..b3ce35231c 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/FieldValueGenerator.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/FieldValueGenerator.java @@ -11,7 +11,8 @@ public class FieldValueGenerator { public static Random random = new Random(System.currentTimeMillis()); - // 生成start到end的时间序列,时间戳为顺序,不含有乱序,field的value为随机生成 + // Generate a time series from start to end, timestamps are in order without + // disorder, field values are randomly generated public static List generate(long start, long end, long timeGap, List fieldMetaList) { List values = new ArrayList<>(); @@ -29,9 +30,12 @@ public class FieldValueGenerator { return values; } - // 生成start到end的时间序列,时间戳为顺序,含有乱序,rate为乱序的比例,range为乱序前跳范围,field的value为随机生成 + // Generate a time series from start to end, timestamps are in order but include + // disorder, rate is the proportion of disorder, range is the jump range before + // disorder, field values are randomly generated public static List disrupt(List values, int rate, long range) { - long timeGap = (long) (values.get(1).getFields().get(0).getValue()) - (long) (values.get(0).getFields().get(0).getValue()); + long timeGap = (long) (values.get(1).getFields().get(0).getValue()) + - (long) (values.get(0).getFields().get(0).getValue()); int bugSize = values.size() * rate / 100; Set bugIndSet = new HashSet<>(); while (bugIndSet.size() < bugSize) { diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SubTableMetaGenerator.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SubTableMetaGenerator.java index 88e3c0d26a..7bdd72ec3b 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SubTableMetaGenerator.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SubTableMetaGenerator.java @@ -9,7 +9,7 @@ import java.util.List; public class SubTableMetaGenerator { - // 创建tableSize张子表,使用tablePrefix作为子表名的前缀,使用superTableMeta的元数据 + // Create tableSize sub-tables, using tablePrefix as the prefix for sub-table names, and using the metadata from superTableMeta // create table xxx using XXX tags(XXX) public static List generate(SuperTableMeta superTableMeta, int tableSize, String tablePrefix) { List subTableMetaList = new ArrayList<>(); diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SuperTableMetaGenerator.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SuperTableMetaGenerator.java index 05aefd01ac..383b492c5a 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SuperTableMetaGenerator.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/SuperTableMetaGenerator.java @@ -10,10 +10,11 @@ import java.util.List; public class SuperTableMetaGenerator { - // 创建超级表,使用指定SQL语句 + // Create super table using the specified SQL statement public static SuperTableMeta generate(String superTableSQL) { SuperTableMeta tableMeta = new SuperTableMeta(); - // for example : create table superTable (ts timestamp, temperature float, humidity int) tags(location nchar(64), groupId int) + // for example : create table superTable (ts timestamp, temperature float, + // humidity int) tags(location nchar(64), groupId int) superTableSQL = superTableSQL.trim().toLowerCase(); if (!superTableSQL.startsWith("create")) throw new RuntimeException("invalid create super table SQL"); @@ -54,8 +55,9 @@ public class SuperTableMetaGenerator { return tableMeta; } - // 创建超级表,指定field和tag的个数 - public static SuperTableMeta generate(String database, String name, int fieldSize, String fieldPrefix, int tagSize, String tagPrefix) { + // Create super table with specified number of fields and tags + public static SuperTableMeta generate(String database, String name, int fieldSize, String fieldPrefix, int tagSize, + String tagPrefix) { if (fieldSize < 2 || tagSize < 1) { throw new RuntimeException("create super table but fieldSize less than 2 or tagSize less than 1"); } @@ -66,7 +68,8 @@ public class SuperTableMetaGenerator { List fields = new ArrayList<>(); fields.add(new FieldMeta("ts", "timestamp")); for (int i = 1; i <= fieldSize; i++) { - fields.add(new FieldMeta(fieldPrefix + "" + i, TaosConstants.DATA_TYPES[i % TaosConstants.DATA_TYPES.length])); + fields.add( + new FieldMeta(fieldPrefix + "" + i, TaosConstants.DATA_TYPES[i % TaosConstants.DATA_TYPES.length])); } tableMetadata.setFields(fields); // tags diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/TagValueGenerator.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/TagValueGenerator.java index b8024fea45..f7b18ca7cf 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/TagValueGenerator.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/service/data/TagValueGenerator.java @@ -9,7 +9,7 @@ import java.util.List; public class TagValueGenerator { - // 创建标签值:使用tagMetas + // Create tag values using tagMetas public static List generate(List tagMetas) { List tagValues = new ArrayList<>(); for (int i = 0; i < tagMetas.size(); i++) { diff --git a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/utils/TimeStampUtil.java b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/utils/TimeStampUtil.java index 9cfce16d82..53748169ae 100644 --- a/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/utils/TimeStampUtil.java +++ b/docs/examples/JDBC/taosdemo/src/main/java/com/taosdata/taosdemo/utils/TimeStampUtil.java @@ -41,17 +41,17 @@ public class TimeStampUtil { if (start == 0) start = now - size * timeGap; - // 如果size小于1异常 + // If size is less than 1, throw an exception if (size < 1) throw new IllegalArgumentException("size less than 1."); - // 如果timeGap为1,已经超长,需要前移start + // If timeGap is 1 and it exceeds the limit, move start forward if (start + size > now) { start = now - size; return new TimeTuple(start, now, 1); } long end = start + (long) (timeGap * size); if (end > now) { - //压缩timeGap + // Compress timeGap end = now; double gap = (end - start) / (size * 1.0f); if (gap < 1.0f) { diff --git a/docs/examples/JDBC/taosdemo/src/main/resources/log4j.properties b/docs/examples/JDBC/taosdemo/src/main/resources/log4j.properties index 352545854d..40b1478a24 100644 --- a/docs/examples/JDBC/taosdemo/src/main/resources/log4j.properties +++ b/docs/examples/JDBC/taosdemo/src/main/resources/log4j.properties @@ -1,21 +1,21 @@ -### 设置### +### Settings ### log4j.rootLogger=info,stdout -### 输出信息到控制抬 ### +### Output information to the console ### log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n -### 输出DEBUG 级别以上的日志到=logs/debug.log +### Output logs of DEBUG level and above to logs/debug.log ### log4j.appender.DebugLog=org.apache.log4j.DailyRollingFileAppender log4j.appender.DebugLog.File=logs/debug.log log4j.appender.DebugLog.Append=true log4j.appender.DebugLog.Threshold=DEBUG log4j.appender.DebugLog.layout=org.apache.log4j.PatternLayout log4j.appender.DebugLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n -### 输出ERROR 级别以上的日志到=logs/error.log +### Output logs of ERROR level and above to logs/error.log ### log4j.appender.ErrorLog=org.apache.log4j.DailyRollingFileAppender log4j.appender.ErrorLog.File=logs/error.log log4j.appender.ErrorLog.Append=true log4j.appender.ErrorLog.Threshold=ERROR log4j.appender.ErrorLog.layout=org.apache.log4j.PatternLayout -log4j.appender.ErrorLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n \ No newline at end of file +log4j.appender.ErrorLog.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n From bc8520363d35bc173c3af699888a93e2157482f3 Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Tue, 1 Oct 2024 10:44:52 +0800 Subject: [PATCH 2/3] mod java code comments --- source/libs/stream/src/streamMsg.c | 855 ----------------------------- 1 file changed, 855 deletions(-) delete mode 100644 source/libs/stream/src/streamMsg.c diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c deleted file mode 100644 index 1c512888e7..0000000000 --- a/source/libs/stream/src/streamMsg.c +++ /dev/null @@ -1,855 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "streamMsg.h" -#include "os.h" -#include "tstream.h" -#include "streamInt.h" - -int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId)); - TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage)); - return 0; -} - -int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) { - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId)); - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId)); - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId)); - TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet)); - TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage)); - return 0; -} - -int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { - TAOS_CHECK_RETURN(tStartEncode(pEncoder)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId)); - TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId)); - TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger)); - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { - TAOS_CHECK_RETURN(tStartDecode(pDecoder)); - TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId)); - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId)); - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId)); - TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps)); - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId)); - TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime)); - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId)); - TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger)); - tEndDecode(pDecoder); - return 0; -} - -int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { - TAOS_CHECK_RETURN(tStartEncode(pEncoder)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId)); - TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId)); - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime)); - TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success)); - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId)); - - int32_t size = taosArrayGetSize(pMsg->pNodeList); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); - - for (int32_t i = 0; i < size; ++i) { - SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); - if (pInfo == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp)); - } - - // todo this new attribute will be result in being incompatible with previous version - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId)); - tEndEncode(pEncoder); -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId)); - - int32_t size = 0; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); - pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); - if (pMsg->pNodeList == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < size; ++i) { - SNodeUpdateInfo info = {0}; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp)); - - if (taosArrayPush(pMsg->pNodeList, &info) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId)); - - tEndDecode(pDecoder); -_exit: - return code; -} - -int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage)); - tEndEncode(pEncoder); - -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage)); - tEndDecode(pDecoder); - -_exit: - return code; -} - -int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status)); - tEndEncode(pEncoder); - -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status)); - tEndDecode(pDecoder); - -_exit: - return code; -} - -int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId)); - tEndEncode(pEncoder); - -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId)); - tEndDecode(pDecoder); - -_exit: - return code; -} - -int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen)); - - if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { - stError("invalid dispatch req msg"); - TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); - } - - for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t* pLen = taosArrayGet(pReq->dataLen, i); - void* data = taosArrayGetP(pReq->data, i); - if (data == NULL || pLen == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen)); - TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen)); - } - tEndEncode(pEncoder); -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen)); - - if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < pReq->blockNum; i++) { - int32_t len1; - uint64_t len2; - void* data; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1)); - TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2)); - - if (len1 != len2) { - TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); - } - - if (taosArrayPush(pReq->dataLen, &len1) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - if (taosArrayPush(pReq->data, &data) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - tEndDecode(pDecoder); -_exit: - return code; -} - -void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { - taosArrayDestroyP(pReq->data, taosMemoryFree); - taosArrayDestroy(pReq->dataLen); -} - -int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId)); - TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen)); - tEndEncode(pEncoder); - -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId)); - uint64_t len = 0; - TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len)); - pReq->retrieveLen = (int32_t)len; - tEndDecode(pDecoder); - -_exit: - return code; -} - -void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } - -int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); - tEndEncode(pEncoder); - -_exit: - return code; -} - -int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); - tEndDecode(pDecoder); - -_exit: - return code; -} - -int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks)); - - for (int32_t i = 0; i < pReq->numOfTasks; ++i) { - STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); - if (ps == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota)); - TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId)); - } - - int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs)); - - for (int j = 0; j < numOfVgs; ++j) { - int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); - if (pVgId == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId)); - } - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts)); - tEndEncode(pEncoder); - -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks)); - - if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < pReq->numOfTasks; ++i) { - int32_t taskId = 0; - STaskStatusEntry entry = {0}; - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota)); - TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId)); - - entry.id.taskId = taskId; - if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - int32_t numOfVgs = 0; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs)); - - if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - - for (int j = 0; j < numOfVgs; ++j) { - int32_t vgId = 0; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId)); - if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts)); - tEndDecode(pDecoder); - -_exit: - return code; -} - -void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { - if (pMsg == NULL) { - return; - } - - if (pMsg->pUpdateNodes != NULL) { - taosArrayDestroy(pMsg->pUpdateNodes); - pMsg->pUpdateNodes = NULL; - } - - if (pMsg->pTaskStatus != NULL) { - taosArrayDestroy(pMsg->pTaskStatus); - pMsg->pTaskStatus = NULL; - } - - pMsg->msgId = -1; - pMsg->vgId = -1; - pMsg->numOfTasks = -1; -} - -int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); - int32_t taskId = pTask->hTaskInfo.id.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); - taskId = pTask->streamTaskId.taskId; - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); - - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); - - int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); - TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); - TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); - - tEndEncode(pEncoder); -_exit: - return code; -} - -int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - int32_t taskId = 0; - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { - TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); - } - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); - TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); - - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); - - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->hTaskInfo.id.taskId = taskId; - - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); - pTask->streamTaskId.taskId = taskId; - - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); - TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); - - int32_t epSz = -1; - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); - - if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - for (int32_t i = 0; i < epSz; i++) { - SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); - if (pInfo == NULL) { - TAOS_CHECK_EXIT(terrno); - } - if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { - taosMemoryFreeClear(pInfo); - goto _exit; - } - if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { - TAOS_CHECK_EXIT(terrno); - } - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); - pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { - TAOS_CHECK_EXIT(terrno); - } - TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); - TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); - } - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); - } - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); - - tEndDecode(pDecoder); - -_exit: - return code; -} - -int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId)); - TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask)); - tEndEncode(pEncoder); - -_exit: - return code; -} - -int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask)); - tEndDecode(pDecoder); - -_exit: - return code; -} - -int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartEncode(pEncoder)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); - TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); - tEndEncode(pEncoder); - -_exit: - if (code) { - return code; - } else { - return pEncoder->pos; - } -} - -int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { - int32_t code = 0; - int32_t lino; - - TAOS_CHECK_EXIT(tStartDecode(pDecoder)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); - TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); - tEndDecode(pDecoder); - -_exit: - return code; -} From 7a1a68f435f55d60db5e88bb4ac5235f26f60618 Mon Sep 17 00:00:00 2001 From: sheyanjie-qq <249478495@qq.com> Date: Tue, 1 Oct 2024 10:56:25 +0800 Subject: [PATCH 3/3] recover streamMsg.c --- source/libs/stream/src/streamMsg.c | 855 +++++++++++++++++++++++++++++ 1 file changed, 855 insertions(+) create mode 100644 source/libs/stream/src/streamMsg.c diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c new file mode 100644 index 0000000000..1c512888e7 --- /dev/null +++ b/source/libs/stream/src/streamMsg.c @@ -0,0 +1,855 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "streamMsg.h" +#include "os.h" +#include "tstream.h" +#include "streamInt.h" + +int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->taskId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->nodeId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pInfo->childId)); + TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pInfo->epSet)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pInfo->stage)); + return 0; +} + +int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamUpstreamEpInfo* pInfo) { + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->taskId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->nodeId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pInfo->childId)); + TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pInfo->epSet)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pInfo->stage)); + return 0; +} + +int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { + TAOS_CHECK_RETURN(tStartEncode(pEncoder)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->nodeId)); + TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pReq->mgmtEps)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->mnodeId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->expireTime)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->transId)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pReq->mndTrigger)); + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq) { + TAOS_CHECK_RETURN(tStartDecode(pDecoder)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->nodeId)); + TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pReq->mgmtEps)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->mnodeId)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->expireTime)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->transId)); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pReq->mndTrigger)); + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp) { + TAOS_CHECK_RETURN(tStartEncode(pEncoder)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->checkpointId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->taskId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->nodeId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->expireTime)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->success)); + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pMsg->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->taskId)); + + int32_t size = taosArrayGetSize(pMsg->pNodeList); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, size)); + + for (int32_t i = 0; i < size; ++i) { + SNodeUpdateInfo* pInfo = taosArrayGet(pMsg->pNodeList, i); + if (pInfo == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pInfo->nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->prevEp)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pInfo->newEp)); + } + + // todo this new attribute will be result in being incompatible with previous version + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId)); + tEndEncode(pEncoder); +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pMsg->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->taskId)); + + int32_t size = 0; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); + pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); + if (pMsg->pNodeList == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < size; ++i) { + SNodeUpdateInfo info = {0}; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.prevEp)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &info.newEp)); + + if (taosArrayPush(pMsg->pNodeList, &info) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId)); + + tEndDecode(pDecoder); +_exit: + return code; +} + +int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage)); + tEndEncode(pEncoder); + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->childId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->reqId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->downstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->childId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->oldStage)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pRsp->status)); + tEndEncode(pEncoder); + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->reqId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->oldStage)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pRsp->status)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->childId)); + tEndEncode(pEncoder); + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->downstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->childId)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->stage)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcVgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->type)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamRelTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->blockNum)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->totalLen)); + + if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { + stError("invalid dispatch req msg"); + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } + + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t* pLen = taosArrayGet(pReq->dataLen, i); + void* data = taosArrayGetP(pReq->data, i); + if (data == NULL || pLen == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pLen)); + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, data, *pLen)); + } + tEndEncode(pEncoder); +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->stage)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcVgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->type)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamRelTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->blockNum)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->totalLen)); + + if ((pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if ((pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < pReq->blockNum; i++) { + int32_t len1; + uint64_t len2; + void* data; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &len1)); + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &len2)); + + if (len1 != len2) { + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } + + if (taosArrayPush(pReq->dataLen, &len1) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + if (taosArrayPush(pReq->data, &data) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + tEndDecode(pDecoder); +_exit: + return code; +} + +void tCleanupStreamDispatchReq(SStreamDispatchReq* pReq) { + taosArrayDestroyP(pReq->data, taosMemoryFree); + taosArrayDestroy(pReq->dataLen); +} + +int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->reqId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->dstTaskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcNodeId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->srcTaskId)); + TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t*)pReq->pRetrieve, pReq->retrieveLen)); + tEndEncode(pEncoder); + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->reqId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->dstTaskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcNodeId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->srcTaskId)); + uint64_t len = 0; + TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, (void**)&pReq->pRetrieve, &len)); + pReq->retrieveLen = (int32_t)len; + tEndDecode(pDecoder); + +_exit: + return code; +} + +void tCleanupStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); } + +int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->vgId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->numOfTasks)); + + for (int32_t i = 0; i < pReq->numOfTasks; ++i) { + STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); + if (ps == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->status)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->stage)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->nodeId)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputQUsed)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->inputRate)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsTotal)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->procsThroughput)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputTotal)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->outputThroughput)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkQuota)); + TAOS_CHECK_EXIT(tEncodeDouble(pEncoder, ps->sinkDataSize)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->processedVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.minVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->verRange.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.activeId)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.failed)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestTime)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.latestSize)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.remoteBackup)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, ps->checkpointInfo.consensusChkptId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->checkpointInfo.consensusTs)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startTime)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->startCheckpointVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, ps->hTaskId)); + } + + int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfVgs)); + + for (int j = 0; j < numOfVgs; ++j) { + int32_t* pVgId = taosArrayGet(pReq->pUpdateNodes, j); + if (pVgId == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *pVgId)); + } + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->msgId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->ts)); + tEndEncode(pEncoder); + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->vgId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->numOfTasks)); + + if ((pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < pReq->numOfTasks; ++i) { + int32_t taskId = 0; + STaskStatusEntry entry = {0}; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.status)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.stage)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.nodeId)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputQUsed)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.inputRate)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsTotal)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.procsThroughput)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputTotal)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.outputThroughput)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkQuota)); + TAOS_CHECK_EXIT(tDecodeDouble(pDecoder, &entry.sinkDataSize)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.processedVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.minVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.verRange.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.activeId)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.failed)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.latestSize)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.remoteBackup)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &entry.checkpointInfo.consensusChkptId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.checkpointInfo.consensusTs)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startTime)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.startCheckpointVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &entry.hTaskId)); + + entry.id.taskId = taskId; + if (taosArrayPush(pReq->pTaskStatus, &entry) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + int32_t numOfVgs = 0; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &numOfVgs)); + + if ((pReq->pUpdateNodes = taosArrayInit(numOfVgs, sizeof(int32_t))) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + for (int j = 0; j < numOfVgs; ++j) { + int32_t vgId = 0; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &vgId)); + if (taosArrayPush(pReq->pUpdateNodes, &vgId) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->msgId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->ts)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { + if (pMsg == NULL) { + return; + } + + if (pMsg->pUpdateNodes != NULL) { + taosArrayDestroy(pMsg->pUpdateNodes); + pMsg->pUpdateNodes = NULL; + } + + if (pMsg->pTaskStatus != NULL) { + taosArrayDestroy(pMsg->pTaskStatus); + pMsg->pTaskStatus = NULL; + } + + pMsg->msgId = -1; + pMsg->vgId = -1; + pMsg->numOfTasks = -1; +} + +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)); + int32_t taskId = pTask->hTaskInfo.id.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->streamTaskId.streamId)); + taskId = pTask->streamTaskId.taskId; + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, taskId)); + + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->dataRange.window.ekey)); + + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, epSz)); + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + TAOS_CHECK_EXIT(tEncodeStreamEpInfo(pEncoder, pInfo)); + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName)); + TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->info.delaySchedParam)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5)); + TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1)); + + tEndEncode(pEncoder); +_exit: + return code; +} + +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->ver)); + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) { + TAOS_CHECK_EXIT(TSDB_CODE_INVALID_MSG); + } + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); + TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); + + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.taskStatus)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->status.schedStatus)); + + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.selfChildId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.epSet)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.fillHistory)); + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->hTaskInfo.id.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &taskId)); + pTask->streamTaskId.taskId = taskId; + + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)); + TAOS_CHECK_EXIT(tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.skey)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)); + + int32_t epSz = -1; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &epSz) < 0); + + if ((pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES)) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + for (int32_t i = 0; i < epSz; i++) { + SStreamUpstreamEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamUpstreamEpInfo)); + if (pInfo == NULL) { + TAOS_CHECK_EXIT(terrno); + } + if ((code = tDecodeStreamEpInfo(pDecoder, pInfo)) < 0) { + taosMemoryFreeClear(pInfo); + goto _exit; + } + if (taosArrayPush(pTask->upstreamInfo.pList, &pInfo) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + TAOS_CHECK_EXIT(tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg)); + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName)); + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId)); + TAOS_CHECK_EXIT(tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet)); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + TAOS_CHECK_EXIT(tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName)); + } + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->info.delaySchedParam)); + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->subtableWithoutMd5)); + } + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve)); + + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointVer)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointTs)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId)); + TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pReq->dropHTask)); + tEndEncode(pEncoder); + +_exit: + return code; +} + +int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointVer)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointTs)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pReq->dropHTask)); + tEndDecode(pDecoder); + +_exit: + return code; +} + +int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartEncode(pEncoder)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->startTs)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId)); + TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->transId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->nodeId)); + tEndEncode(pEncoder); + +_exit: + if (code) { + return code; + } else { + return pEncoder->pos; + } +} + +int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) { + int32_t code = 0; + int32_t lino; + + TAOS_CHECK_EXIT(tStartDecode(pDecoder)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->startTs)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId)); + TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->transId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->nodeId)); + tEndDecode(pDecoder); + +_exit: + return code; +}