diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 59986a3b3c..fdb9f102f0 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -274,7 +274,7 @@ if(${BUILD_WITH_ROCKSDB}) option(WITH_TOOLS "" OFF) option(WITH_LIBURING "" OFF) IF (TD_LINUX) - option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON) + option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) ELSE() option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) ENDIF() diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index db49e5f395..9c5a852c70 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -32,25 +32,22 @@ TDengine's JDBC driver implementation is as consistent as possible with the rela Native connections are supported on the same platforms as the TDengine client driver. REST connection supports all platforms that can run Java. -## Version support - -Please refer to [version support list](/reference/connector#version-support) - ## Recent update logs -| taos-jdbcdriver version | major changes | -| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | -| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | -| 3.2.0 | This version has been deprecated | -| 3.1.0 | JDBC REST connection supports subscription over WebSocket | -| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | -| 3.0.0 | Support for TDengine 3.0 | -| 2.0.42 | fix wasNull interface return value in WebSocket connection | -| 2.0.41 | fix decode method of username and password in REST connection | -| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | -| 2.0.38 | JDBC REST connections add bulk pull function | -| 2.0.37 | Support json tags | -| 2.0.36 | Support schemaless writing | +| taos-jdbcdriver version | major changes | TDengine version | +| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: | +| 3.2.1 | subscription add seek function | 3.0.5.0 or later | +| 3.2.1 | JDBC REST connection supports schemaless/prepareStatement over WebSocket | 3.0.3.0 or later | +| 3.2.0 | This version has been deprecated | - | +| 3.1.0 | JDBC REST connection supports subscription over WebSocket | - | +| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment | - | +| 3.0.0 | Support for TDengine 3.0 | 3.0.0.0 or later | +| 2.0.42 | fix wasNull interface return value in WebSocket connection | - | +| 2.0.41 | fix decode method of username and password in REST connection | - | +| 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | - | +| 2.0.38 | JDBC REST connections add bulk pull function | - | +| 2.0.37 | Support json tags | - | +| 2.0.36 | Support schemaless writing | - | **Note**: adding `batchfetch` to the REST connection and setting it to true will enable the WebSocket connection. @@ -102,6 +99,8 @@ For specific error codes, please refer to. | 0x2319 | user is required | The user name information is missing when creating the connection | | 0x231a | password is required | Password information is missing when creating a connection | | 0x231c | httpEntity is null, sql: | Execution exception occurred during the REST connection | +| 0x231d | can't create connection with server within | Increase the connection time by adding the httpConnectTimeout parameter, or check the connection to the taos adapter. | +| 0x231e | failed to complete the task within the specified time | Increase the execution time by adding the messageWaitTimeout parameter, or check the connection to the taos adapter. | | 0x2350 | unknown error | Unknown exception, please return to the developer on github. | | 0x2352 | Unsupported encoding | An unsupported character encoding set is specified under the native Connection. | | 0x2353 | internal error of database, please see taoslog for more details | An error occurs when the prepare statement is executed on the native connection. Check the taos log to locate the fault. | @@ -117,8 +116,8 @@ For specific error codes, please refer to. | 0x2376 | failed to set consumer topic, topic name is empty | During data subscription creation, the subscription topic name is empty. Check that the specified topic name is correct. | | 0x2377 | consumer reference has been destroyed | The subscription data transfer channel has been closed. Please check the connection to TDengine. | | 0x2378 | consumer create error | Failed to create a data subscription. Check the taos log according to the error message to locate the fault. | -| - | can't create connection with server within | Increase the connection time by adding the httpConnectTimeout parameter, or check the connection to the taos adapter. | -| - | failed to complete the task within the specified time | Increase the execution time by adding the messageWaitTimeout parameter, or check the connection to the taos adapter. | +| 0x2379 | seek offset must not be a negative number | The seek interface parameter cannot be negative. Use the correct parameter | +| 0x237a | vGroup not found in result set | subscription is not bound to the VGroup due to the rebalance mechanism | - [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java) @@ -169,7 +168,7 @@ Add following dependency in the `pom.xml` file of your Maven project: com.taosdata.jdbc taos-jdbcdriver - 3.2.1 + 3.2.2 ``` @@ -913,14 +912,15 @@ public class SchemalessWsTest { public static void main(String[] args) throws SQLException { final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata&batchfetch=true"; - Connection connection = DriverManager.getConnection(url); - init(connection); + try(Connection connection = DriverManager.getConnection(url)){ + init(connection); - SchemalessWriter writer = new SchemalessWriter(connection, "test_ws_schemaless"); - writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); - writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); - writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS); - System.exit(0); + try(SchemalessWriter writer = new SchemalessWriter(connection, "test_ws_schemaless")){ + writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); + writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); + writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS); + } + } } private static void init(Connection connection) throws SQLException { @@ -991,6 +991,17 @@ while(true) { `poll` obtains one message each time it is run. +#### Assignment subscription Offset + +``` +long position(TopicPartition partition) throws SQLException; +Map position(String topic) throws SQLException; +Map beginningOffsets(String topic) throws SQLException; +Map endOffsets(String topic) throws SQLException; + +void seek(TopicPartition partition, long offset) throws SQLException; +``` + #### Close subscriptions ```java diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index 46800226d7..1588159b57 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -32,25 +32,22 @@ TDengine 的 JDBC 驱动实现尽可能与关系型数据库驱动保持一致 原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 REST 连接支持所有能运行 Java 的平台。 -## 版本支持 +## 版本历史 -请参考[版本支持列表](../#版本支持) - -## 最近更新记录 - -| taos-jdbcdriver 版本 | 主要变化 | -| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | -| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | -| 3.2.0 | 存在连接问题,不推荐使用 | -| 3.1.0 | WebSocket 连接支持订阅功能 | -| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 | -| 3.0.0 | 支持 TDengine 3.0 | -| 2.0.42 | 修在 WebSocket 连接中 wasNull 接口返回值 | -| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 | -| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | -| 2.0.38 | JDBC REST 连接增加批量拉取功能 | -| 2.0.37 | 增加对 json tag 支持 | -| 2.0.36 | 增加对 schemaless 写入支持 | +| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 | +| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: | +| 3.2.2 | 新增功能:数据订阅支持 seek 功能。 | 3.0.5.0 及更高版本 | +| 3.2.1 | 新增功能:WebSocket 连接支持 schemaless 与 prepareStatement 写入。变更:consumer poll 返回结果集为 ConsumerRecord,可通过 value() 获取指定结果集数据。 | 3.0.3.0 及更高版本 | +| 3.2.0 | 存在连接问题,不推荐使用 | - | +| 3.1.0 | WebSocket 连接支持订阅功能 | - | +| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 | - | +| 3.0.0 | 支持 TDengine 3.0 | 3.0.0.0 及更高版本 | +| 2.0.42 | 修在 WebSocket 连接中 wasNull 接口返回值 | - | +| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 | - | +| 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | - | +| 2.0.38 | JDBC REST 连接增加批量拉取功能 | - | +| 2.0.37 | 增加对 json tag 支持 | - | +| 2.0.36 | 增加对 schemaless 写入支持 | - | **注**:REST 连接中增加 `batchfetch` 参数并设置为 true,将开启 WebSocket 连接。 @@ -80,45 +77,47 @@ JDBC 连接器可能报错的错误码包括 4 种: 具体的错误码请参考: -| Error Code | Description | Suggested Actions | -| ---------- | --------------------------------------------------------------- | --------------------------------------------------------------------------------------- | -| 0x2301 | connection already closed | 连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。 | -| 0x2302 | this operation is NOT supported currently! | 当前使用接口不支持,可以更换其他连接方式。 | -| 0x2303 | invalid variables | 参数不合法,请检查相应接口规范,调整参数类型及大小。 | -| 0x2304 | statement is closed | statement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。 | -| 0x2305 | resultSet is closed | resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。 | -| 0x2306 | Batch is empty! | prepareStatement 添加参数后再执行 executeBatch。 | -| 0x2307 | Can not issue data manipulation statements with executeQuery() | 更新操作应该使用 executeUpdate(),而不是 executeQuery()。 | -| 0x2308 | Can not issue SELECT via executeUpdate() | 查询操作应该使用 executeQuery(),而不是 executeUpdate()。 | -| 0x230d | parameter index out of range | 参数越界,请检查参数的合理范围。 | -| 0x230e | connection already closed | 连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。 | -| 0x230f | unknown sql type in tdengine | 请检查 TDengine 支持的 Data Type 类型。 | -| 0x2310 | can't register JDBC-JNI driver | 不能注册 JNI 驱动,请检查 url 是否填写正确。 | -| 0x2312 | url is not set | 请检查 REST 连接 url 是否填写正确。 | -| 0x2314 | numeric value out of range | 请检查获取结果集中数值类型是否使用了正确的接口。 | -| 0x2315 | unknown taos type in tdengine | 在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。 | -| 0x2317 | | REST 连接中使用了错误的请求类型。 | -| 0x2318 | | REST 连接中出现了数据传输异常,请检查网络情况并重试。 | -| 0x2319 | user is required | 创建连接时缺少用户名信息 | -| 0x231a | password is required | 创建连接时缺少密码信息 | -| 0x231c | httpEntity is null, sql: | REST 连接中执行出现异常 | -| 0x2350 | unknown error | 未知异常,请在 github 反馈给开发人员。 | -| 0x2352 | Unsupported encoding | 本地连接下指定了不支持的字符编码集 | -| 0x2353 | internal error of database, please see taoslog for more details | 本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。 | -| 0x2354 | JNI connection is NULL | 本地连接执行命令时,Connection 已经关闭。请检查与 TDengine 的连接情况。 | -| 0x2355 | JNI result set is NULL | 本地连接获取结果集,结果集异常,请检查连接情况,并重试。 | -| 0x2356 | invalid num of fields | 本地连接获取结果集的 meta 信息不匹配。 | -| 0x2357 | empty sql string | 填写正确的 SQL 进行执行。 | -| 0x2359 | JNI alloc memory failed, please see taoslog for more details | 本地连接分配内存错误,请检查 taos log 进行问题定位。 | -| 0x2371 | consumer properties must not be null! | 创建订阅时参数为空,请填写正确的参数。 | -| 0x2372 | configs contain empty key, failed to set consumer property | 参数 key 中包含空值,请填写正确的参数。 | -| 0x2373 | failed to set consumer property, | 参数 value 中包含空值,请填写正确的参数。 | -| 0x2375 | topic reference has been destroyed | 创建数据订阅过程中,topic 引用被释放。请检查与 TDengine 的连接情况。 | -| 0x2376 | failed to set consumer topic, topic name is empty | 创建数据订阅过程中,订阅 topic 名称为空。请检查指定的 topic 名称是否填写正确。 | -| 0x2377 | consumer reference has been destroyed | 订阅数据传输通道已经关闭,请检查与 TDengine 的连接情况。 | -| 0x2378 | consumer create error | 创建数据订阅失败,请根据错误信息检查 taos log 进行问题定位。 | -| - | can't create connection with server within | 通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。 | -| - | failed to complete the task within the specified time | 通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。 | +| Error Code | Description | Suggested Actions | +| ---------- | --------------------------------------------------------------- | ----------------------------------------------------------------------------------------- | +| 0x2301 | connection already closed | 连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。 | +| 0x2302 | this operation is NOT supported currently! | 当前使用接口不支持,可以更换其他连接方式。 | +| 0x2303 | invalid variables | 参数不合法,请检查相应接口规范,调整参数类型及大小。 | +| 0x2304 | statement is closed | statement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。 | +| 0x2305 | resultSet is closed | resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。 | +| 0x2306 | Batch is empty! | prepareStatement 添加参数后再执行 executeBatch。 | +| 0x2307 | Can not issue data manipulation statements with executeQuery() | 更新操作应该使用 executeUpdate(),而不是 executeQuery()。 | +| 0x2308 | Can not issue SELECT via executeUpdate() | 查询操作应该使用 executeQuery(),而不是 executeUpdate()。 | +| 0x230d | parameter index out of range | 参数越界,请检查参数的合理范围。 | +| 0x230e | connection already closed | 连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。 | +| 0x230f | unknown sql type in tdengine | 请检查 TDengine 支持的 Data Type 类型。 | +| 0x2310 | can't register JDBC-JNI driver | 不能注册 JNI 驱动,请检查 url 是否填写正确。 | +| 0x2312 | url is not set | 请检查 REST 连接 url 是否填写正确。 | +| 0x2314 | numeric value out of range | 请检查获取结果集中数值类型是否使用了正确的接口。 | +| 0x2315 | unknown taos type in tdengine | 在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。 | +| 0x2317 | | REST 连接中使用了错误的请求类型。 | +| 0x2318 | | REST 连接中出现了数据传输异常,请检查网络情况并重试。 | +| 0x2319 | user is required | 创建连接时缺少用户名信息 | +| 0x231a | password is required | 创建连接时缺少密码信息 | +| 0x231c | httpEntity is null, sql: | REST 连接中执行出现异常 | +| 0x231d | can't create connection with server within | 通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。 | +| 0x231e | failed to complete the task within the specified time | 通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。 | +| 0x2350 | unknown error | 未知异常,请在 github 反馈给开发人员。 | +| 0x2352 | Unsupported encoding | 本地连接下指定了不支持的字符编码集 | +| 0x2353 | internal error of database, please see taoslog for more details | 本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。 | +| 0x2354 | JNI connection is NULL | 本地连接执行命令时,Connection 已经关闭。请检查与 TDengine 的连接情况。 | +| 0x2355 | JNI result set is NULL | 本地连接获取结果集,结果集异常,请检查连接情况,并重试。 | +| 0x2356 | invalid num of fields | 本地连接获取结果集的 meta 信息不匹配。 | +| 0x2357 | empty sql string | 填写正确的 SQL 进行执行。 | +| 0x2359 | JNI alloc memory failed, please see taoslog for more details | 本地连接分配内存错误,请检查 taos log 进行问题定位。 | +| 0x2371 | consumer properties must not be null! | 创建订阅时参数为空,请填写正确的参数。 | +| 0x2372 | configs contain empty key, failed to set consumer property | 参数 key 中包含空值,请填写正确的参数。 | +| 0x2373 | failed to set consumer property, | 参数 value 中包含空值,请填写正确的参数。 | +| 0x2375 | topic reference has been destroyed | 创建数据订阅过程中,topic 引用被释放。请检查与 TDengine 的连接情况。 | +| 0x2376 | failed to set consumer topic, topic name is empty | 创建数据订阅过程中,订阅 topic 名称为空。请检查指定的 topic 名称是否填写正确。 | +| 0x2377 | consumer reference has been destroyed | 订阅数据传输通道已经关闭,请检查与 TDengine 的连接情况。 | +| 0x2378 | consumer create error | 创建数据订阅失败,请根据错误信息检查 taos log 进行问题定位。 | +| 0x2379 | seek offset must not be a negative number | seek 接口参数不能为负值,请使用正确的参数 | +| 0x237a | vGroup not found in result set | VGroup 没有分配给当前 consumer,由于 Rebalance 机制导致 Consumer 与 VGroup 不是绑定的关系 | - [TDengine Java Connector](https://github.com/taosdata/taos-connector-jdbc/blob/main/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java) @@ -169,7 +168,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖: com.taosdata.jdbc taos-jdbcdriver - 3.2.1 + 3.2.2 ``` @@ -916,14 +915,15 @@ public class SchemalessWsTest { public static void main(String[] args) throws SQLException { final String url = "jdbc:TAOS-RS://" + host + ":6041/?user=root&password=taosdata&batchfetch=true"; - Connection connection = DriverManager.getConnection(url); - init(connection); + try(Connection connection = DriverManager.getConnection(url)){ + init(connection); - SchemalessWriter writer = new SchemalessWriter(connection, "test_ws_schemaless"); - writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); - writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); - writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS); - System.exit(0); + try(SchemalessWriter writer = new SchemalessWriter(connection, "test_ws_schemaless")){ + writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO_SECONDS); + writer.write(telnetDemo, SchemalessProtocolType.TELNET, SchemalessTimestampType.MILLI_SECONDS); + writer.write(jsonDemo, SchemalessProtocolType.JSON, SchemalessTimestampType.SECONDS); + } + } } private static void init(Connection connection) throws SQLException { @@ -994,6 +994,17 @@ while(true) { `poll` 每次调用获取一个消息。 +#### 指定订阅 Offset + +``` +long position(TopicPartition partition) throws SQLException; +Map position(String topic) throws SQLException; +Map beginningOffsets(String topic) throws SQLException; +Map endOffsets(String topic) throws SQLException; + +void seek(TopicPartition partition, long offset) throws SQLException; +``` + #### 关闭订阅 ```java diff --git a/examples/JDBC/springbootdemo/src/main/resources/application.properties b/examples/JDBC/springbootdemo/src/main/resources/application.properties index bf21047395..c523952fb6 100644 --- a/examples/JDBC/springbootdemo/src/main/resources/application.properties +++ b/examples/JDBC/springbootdemo/src/main/resources/application.properties @@ -5,7 +5,7 @@ #spring.datasource.password=taosdata # datasource config - JDBC-RESTful spring.datasource.driver-class-name=com.taosdata.jdbc.rs.RestfulDriver -spring.datasource.url=jdbc:TAOS-RS://localhost:6041/test?timezone=UTC-8&charset=UTF-8&locale=en_US.UTF-8 +spring.datasource.url=jdbc:TAOS-RS://localhost:6041/test spring.datasource.username=root spring.datasource.password=taosdata spring.datasource.druid.initial-size=5 diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index e14c4e60d9..07fc2fd71b 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -42,27 +42,27 @@ IF (TD_LINUX) ) target_link_libraries(tmq - taos_static + taos ) target_link_libraries(stream_demo - taos_static + taos ) target_link_libraries(schemaless - taos_static + taos ) target_link_libraries(prepare - taos_static + taos ) target_link_libraries(demo - taos_static + taos ) target_link_libraries(asyncdemo - taos_static + taos ) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index e015f4182e..c92ce254a8 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -163,6 +163,7 @@ typedef struct { int64_t checkPointId; int32_t taskId; int64_t streamId; + int64_t streamBackendRid; } SStreamState; typedef struct SFunctionStateStore { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 51f2de481d..73c88fae8d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -345,7 +345,6 @@ typedef struct SStreamMeta { SRWLatch lock; int32_t walScanCounter; void* streamBackend; - int32_t streamBackendId; int64_t streamBackendRid; SHashObj* pTaskBackendUnique; } SStreamMeta; diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 18cd0d9cc6..683d10e926 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -22,21 +22,20 @@ extern "C" { // If the error is in a third-party library, place this header file under the third-party library header file. // When you want to use this feature, you should find or add the same function in the following sectio -// #if !defined(WINDOWS) +#if !defined(WINDOWS) -// #ifndef ALLOW_FORBID_FUNC -// #define malloc MALLOC_FUNC_TAOS_FORBID -// #define calloc CALLOC_FUNC_TAOS_FORBID -// #define realloc REALLOC_FUNC_TAOS_FORBID -// #define free FREE_FUNC_TAOS_FORBID -// #ifdef strdup -// #undef strdup -// #define strdup STRDUP_FUNC_TAOS_FORBID -// #endif -// #endif // ifndef ALLOW_FORBID_FUNC -// #endif // if !defined(WINDOWS) +#ifndef ALLOW_FORBID_FUNC +#define malloc MALLOC_FUNC_TAOS_FORBID +#define calloc CALLOC_FUNC_TAOS_FORBID +#define realloc REALLOC_FUNC_TAOS_FORBID +#define free FREE_FUNC_TAOS_FORBID +#ifdef strdup +#undef strdup +#define strdup STRDUP_FUNC_TAOS_FORBID +#endif +#endif // ifndef ALLOW_FORBID_FUNC +#endif // if !defined(WINDOWS) -// // #define taosMemoryFree malloc // #define taosMemoryMalloc malloc // #define taosMemoryCalloc calloc // #define taosMemoryRealloc realloc diff --git a/packaging/deb/DEBIAN/preinst b/packaging/deb/DEBIAN/preinst index d6558d5b3b..904a946e20 100644 --- a/packaging/deb/DEBIAN/preinst +++ b/packaging/deb/DEBIAN/preinst @@ -80,5 +80,4 @@ fi # there can not libtaos.so*, otherwise ln -s error ${csudo}rm -f ${install_main_dir}/driver/libtaos.* || : -[ -f ${install_main_dir}/driver/librocksdb.* ] && ${csudo}rm -f ${install_main_dir}/driver/librocksdb.* || : [ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}rm -f ${install_main_dir}/driver/libtaosws.so || : diff --git a/packaging/deb/DEBIAN/prerm b/packaging/deb/DEBIAN/prerm index 8f8d472867..0d63115a04 100644 --- a/packaging/deb/DEBIAN/prerm +++ b/packaging/deb/DEBIAN/prerm @@ -40,7 +40,6 @@ else ${csudo}rm -f ${inc_link_dir}/taosudf.h || : [ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || : - [ -f ${lib_link_dir}/librocksdb.* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : [ -f ${lib_link_dir}/libtaosws.so ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.so || : ${csudo}rm -f ${log_link_dir} || : diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index 024c69deb1..9f49cf345a 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -31,7 +31,6 @@ cd ${pkg_dir} libfile="libtaos.so.${tdengine_ver}" wslibfile="libtaosws.so" -rocksdblib="librocksdb.so.8" # create install dir install_home_path="/usr/local/taos" @@ -95,7 +94,6 @@ fi cp ${compile_dir}/build/bin/taos ${pkg_dir}${install_home_path}/bin cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_path}/driver -[ -f ${compile_dir}/build/lib/${rocksdblib} ] && cp ${compile_dir}/build/lib/${rocksdblib} ${pkg_dir}${install_home_path}/driver ||: [ -f ${compile_dir}/build/lib/${wslibfile} ] && cp ${compile_dir}/build/lib/${wslibfile} ${pkg_dir}${install_home_path}/driver ||: cp ${compile_dir}/../include/client/taos.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../include/common/taosdef.h ${pkg_dir}${install_home_path}/include diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index 2b056c376a..52d5335003 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -45,7 +45,6 @@ echo buildroot: %{buildroot} libfile="libtaos.so.%{_version}" wslibfile="libtaosws.so" -rocksdblib="librocksdb.so.8" # create install path, and cp file mkdir -p %{buildroot}%{homepath}/bin @@ -93,7 +92,6 @@ if [ -f %{_compiledir}/build/bin/taosadapter ]; then fi cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver [ -f %{_compiledir}/build/lib/${wslibfile} ] && cp %{_compiledir}/build/lib/${wslibfile} %{buildroot}%{homepath}/driver ||: -[ -f %{_compiledir}/build/lib/${rocksdblib} ] && cp %{_compiledir}/build/lib/${rocksdblib} %{buildroot}%{homepath}/driver ||: cp %{_compiledir}/../include/client/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/common/taosdef.h %{buildroot}%{homepath}/include cp %{_compiledir}/../include/util/taoserror.h %{buildroot}%{homepath}/include @@ -176,7 +174,6 @@ fi # there can not libtaos.so*, otherwise ln -s error ${csudo}rm -f %{homepath}/driver/libtaos* || : -${csudo}rm -f %{homepath}/driver/librocksdb* || : #Scripts executed after installation %post @@ -222,7 +219,6 @@ if [ $1 -eq 0 ];then ${csudo}rm -f ${inc_link_dir}/taoserror.h || : ${csudo}rm -f ${inc_link_dir}/taosudf.h || : ${csudo}rm -f ${lib_link_dir}/libtaos.* || : - ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : ${csudo}rm -f ${log_link_dir} || : ${csudo}rm -f ${data_link_dir} || : diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 9aa019f218..1b47b10520 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -250,30 +250,18 @@ function install_lib() { # Remove links ${csudo}rm -f ${lib_link_dir}/libtaos.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || : - ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : - ${csudo}rm -f ${lib64_link_dir}/librocksdb.* || : #${csudo}rm -rf ${v15_java_app_dir} || : ${csudo}cp -rf ${script_dir}/driver/* ${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/* ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 ${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so - ${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8 - ${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so - - ${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib_link_dir}/librocksdb.so.8 - ${csudo}ln -sf ${lib_link_dir}/librocksdb.so.8 ${lib_link_dir}/librocksdb.so - - [ -f ${install_main_dir}/driver/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || : if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.so ]]; then ${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 || : ${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so || : - ${csudo}ln -sf ${install_main_dir}/driver/librocksdb.* ${lib64_link_dir}/librocksdb.so.8 || : - ${csudo}ln -sf ${lib64_link_dir}/librocksdb.so.8 ${lib64_link_dir}/librocksdb.so || : - [ -f ${install_main_dir}/libtaosws.so ] && ${csudo}ln -sf ${install_main_dir}/libtaosws.so ${lib64_link_dir}/libtaosws.so || : fi diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index ab45c684c4..b0537e8bcf 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -111,11 +111,9 @@ fi if [ "$osType" == "Darwin" ]; then lib_files="${build_dir}/lib/libtaos.${version}.dylib" wslib_files="${build_dir}/lib/libtaosws.dylib" - rocksdb_lib_files="${build_dir}/lib/librocksdb.dylib.8.1.1" else lib_files="${build_dir}/lib/libtaos.so.${version}" wslib_files="${build_dir}/lib/libtaosws.so" - rocksdb_lib_files="${build_dir}/lib/librocksdb.so.8.1.1" fi header_files="${code_dir}/include/client/taos.h ${code_dir}/include/common/taosdef.h ${code_dir}/include/util/taoserror.h ${code_dir}/include/libs/function/taosudf.h" @@ -338,7 +336,6 @@ fi # Copy driver mkdir -p ${install_dir}/driver && cp ${lib_files} ${install_dir}/driver && echo "${versionComp}" >${install_dir}/driver/vercomp.txt [ -f ${wslib_files} ] && cp ${wslib_files} ${install_dir}/driver || : -[ -f ${rocksdb_lib_files} ] && cp ${rocksdb_lib_files} ${install_dir}/driver || : # Copy connector if [ "$verMode" == "cluster" ]; then diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh index 10de87966f..fc392c9684 100755 --- a/packaging/tools/post.sh +++ b/packaging/tools/post.sh @@ -202,19 +202,10 @@ function install_lib() { log_print "start install lib from ${lib_dir} to ${lib_link_dir}" ${csudo}rm -f ${lib_link_dir}/libtaos* || : ${csudo}rm -f ${lib64_link_dir}/libtaos* || : - - #rocksdb - [ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || : - [ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || : - - #rocksdb - [ -f ${lib_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib_link_dir}/librocksdb* || : - [ -f ${lib64_link_dir}/librocksdb* ] && ${csudo}rm -f ${lib64_link_dir}/librocksdb* || : [ -f ${lib_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.${lib_file_ext} || : [ -f ${lib64_link_dir}/libtaosws.${lib_file_ext} ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.${lib_file_ext} || : - ${csudo}ln -s ${lib_dir}/librocksdb.* ${lib_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_dir}/libtaos.* ${lib_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib_link_dir}/libtaos.${lib_file_ext_1} ${lib_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1 @@ -223,7 +214,6 @@ function install_lib() { if [[ -d ${lib64_link_dir} && ! -e ${lib64_link_dir}/libtaos.${lib_file_ext} ]]; then ${csudo}ln -s ${lib_dir}/libtaos.* ${lib64_link_dir}/libtaos.${lib_file_ext_1} 2>>${install_log_path} || return 1 ${csudo}ln -s ${lib64_link_dir}/libtaos.${lib_file_ext_1} ${lib64_link_dir}/libtaos.${lib_file_ext} 2>>${install_log_path} || return 1 - ${csudo}ln -s ${lib_dir}/librocksdb.* ${lib64_link_dir}/librocksdb.${lib_file_ext_1} 2>>${install_log_path} || return 1 [ -f ${lib_dir}/libtaosws.${lib_file_ext} ] && ${csudo}ln -sf ${lib_dir}/libtaosws.${lib_file_ext} ${lib64_link_dir}/libtaosws.${lib_file_ext} 2>>${install_log_path} fi diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index a17b29983c..be2c26c309 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -142,11 +142,9 @@ function clean_local_bin() { function clean_lib() { # Remove link ${csudo}rm -f ${lib_link_dir}/libtaos.* || : - ${csudo}rm -f ${lib_link_dir}/librocksdb.* || : [ -f ${lib_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib_link_dir}/libtaosws.* || : ${csudo}rm -f ${lib64_link_dir}/libtaos.* || : - ${csudo}rm -f ${lib64_link_dir}/librocksdb.* || : [ -f ${lib64_link_dir}/libtaosws.* ] && ${csudo}rm -f ${lib64_link_dir}/libtaosws.* || : #${csudo}rm -rf ${v15_java_app_dir} || : diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index c238cb38bc..8c80fab076 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -115,7 +115,7 @@ target_link_libraries( # PUBLIC bdb # PUBLIC scalar - PUBLIC rocksdb-shared + PUBLIC rocksdb PUBLIC transport PUBLIC stream PUBLIC index diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 7aac639027..c448ea0160 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -647,6 +647,8 @@ uint64_t calcGroupId(char* pData, int32_t len) { // NOTE: only extract the initial 8 bytes of the final MD5 digest uint64_t id = 0; memcpy(&id, context.digest, sizeof(uint64_t)); + if (0 == id) + memcpy(&id, context.digest + 8, sizeof(uint64_t)); return id; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 442f8162ed..73143fdba7 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -319,6 +319,11 @@ void destroyMergeJoinOperator(void* param) { } nodesDestroyNode(pJoinOperator->pCondAfterMerge); + taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); + taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); + taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); + pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index dde6f7c0e8..e7de826d4b 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -213,6 +213,8 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS } else { if (limitReached && (pLimitInfo->slimit.limit >= 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { setOperatorCompleted(pOperator); + } else if (limitReached && groupId == 0) { + setOperatorCompleted(pOperator); } } diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index fa6c709c8f..d1ef7fe3c1 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -11,7 +11,7 @@ if(${BUILD_WITH_ROCKSDB}) IF (TD_LINUX) target_link_libraries( stream - PUBLIC rocksdb-shared tdb + PUBLIC rocksdb tdb PRIVATE os util transport qcom executor wal index ) ELSE() diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 2c1956998a..c7ee308b61 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -36,8 +36,9 @@ static SStreamGlobalEnv streamEnv; int32_t streamDispatchStreamBlock(SStreamTask* pTask); SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); -SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); -void destroyStreamDataBlock(SStreamDataBlock* pBlock); +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, + SArray* pRes); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); @@ -53,6 +54,8 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); +extern int32_t streamBackendId; + #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index b3995f020b..df045eef20 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -16,7 +16,9 @@ #include "streamBackendRocksdb.h" #include "executor.h" #include "query.h" +#include "streamInc.h" #include "tcommon.h" +#include "tref.h" typedef struct SCompactFilteFactory { void* status; @@ -79,8 +81,8 @@ const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); void* streamBackendInit(const char* path) { - qDebug("init stream backend"); - SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); + qDebug("start to init stream backend at %s", path); + SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->cfMutex, NULL); @@ -119,6 +121,7 @@ void* streamBackendInit(const char* path) { if (err != NULL) { qError("failed to open rocksdb, path:%s, reason:%s", path, err); taosMemoryFreeClear(err); + goto _EXIT; } } else { /* @@ -129,6 +132,7 @@ void* streamBackendInit(const char* path) { if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); } + qDebug("succ to init stream backend at %s, backend:%p", path, pHandle); return (void*)pHandle; _EXIT: @@ -140,7 +144,8 @@ _EXIT: taosHashCleanup(pHandle->cfInst); rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); tdListFree(pHandle->list); - free(pHandle); + taosMemoryFree(pHandle); + qDebug("failed to init stream backend at %s", path); return NULL; } void streamBackendCleanup(void* arg) { @@ -168,19 +173,20 @@ void streamBackendCleanup(void* arg) { rocksdb_env_destroy(pHandle->env); rocksdb_cache_destroy(pHandle->cache); - taosThreadMutexDestroy(&pHandle->mutex); SListNode* head = tdListPopHead(pHandle->list); while (head != NULL) { streamStateDestroyCompar(head->data); taosMemoryFree(head); head = tdListPopHead(pHandle->list); } - // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); + tdListFree(pHandle->list); + taosThreadMutexDestroy(&pHandle->mutex); + taosThreadMutexDestroy(&pHandle->cfMutex); taosMemoryFree(pHandle); - + qDebug("destroy stream backend backend:%p", pHandle); return; } SListNode* streamBackendAddCompare(void* backend, void* arg) { @@ -803,7 +809,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - qInfo("start to open backend, %p 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); + taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendHandle* handle = backend; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); @@ -866,7 +873,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); // rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); - qInfo("succ to open backend, %p, 0x%" PRIx64 "-%d", pState, pState->streamId, pState->taskId); + qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId); return 0; } @@ -882,8 +889,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadMutexUnlock(&pHandle->cfMutex); char* status[] = {"close", "drop"}; - qInfo("start to %s backend, %p, 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pState->streamId, - pState->taskId); + qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle, + pState->streamId, pState->taskId); if (pState->pTdbState->rocksdb == NULL) { return; } @@ -938,6 +945,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadRwlockDestroy(&pState->pTdbState->rwLock); pState->pTdbState->rocksdb = NULL; + taosReleaseRef(streamBackendId, pState->streamBackendRid); } void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8c26052fdb..ed9f99cf78 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -20,7 +20,7 @@ #include "ttimer.h" static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; -static int32_t streamBackendId = 0; +int32_t streamBackendId = 0; static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } @@ -79,7 +79,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->vgId = vgId; pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - pMeta->streamBackendId = streamBackendId; memset(streamPath, 0, len); sprintf(streamPath, "%s/%s", pMeta->path, "state"); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 71a21ac150..967c7733c9 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -106,7 +106,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz } SStreamTask* pStreamTask = pTask; - char statePath[1024]; + char statePath[1024]; if (!specPath) { sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); } else { @@ -119,10 +119,10 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; - taosAcquireRef(pMeta->streamBackendId, pMeta->streamBackendRid); + pState->streamBackendRid = pMeta->streamBackendRid; int code = streamStateOpenBackend(pMeta->streamBackend, pState); if (code == -1) { - taosReleaseRef(pMeta->streamBackendId, pMeta->streamBackendRid); + taosReleaseRef(streamBackendId, pMeta->streamBackendRid); taosMemoryFree(pState); pState = NULL; } @@ -222,9 +222,7 @@ _err: void streamStateClose(SStreamState* pState, bool remove) { SStreamTask* pTask = pState->pTdbState->pOwner; #ifdef USE_ROCKSDB - // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -278,10 +276,10 @@ int32_t streamStateCommit(SStreamState* pState) { int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); memcpy(buf + len - rowSize, value, vLen); return code; @@ -291,10 +289,10 @@ int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* } int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - void* pVal = NULL; - int32_t len = 0; - int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); - char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); *ppVal = buf + len - rowSize; return code; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index bc84509728..bfaeca89f6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -419,7 +419,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0 || len == 0 || val == NULL) { return TSDB_CODE_FAILED; } - memcpy(val, buf, len); + memcpy(buf, val, len); buf[len] = 0; maxCheckPointId = atol((char*)buf); taosMemoryFree(val); @@ -433,7 +433,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (code != 0) { return TSDB_CODE_FAILED; } - memcpy(val, buf, len); + memcpy(buf, val, len); buf[len] = 0; taosMemoryFree(val); diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 87b0d11d1c..71dfd710a5 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -9,35 +9,35 @@ add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) target_link_libraries( tmq_offset - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( create_table - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( tmq_demo - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( tmq_sim - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os ) target_link_libraries( tmq_taosx_ci - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os @@ -45,7 +45,7 @@ target_link_libraries( target_link_libraries( write_raw_block_test - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os @@ -53,7 +53,7 @@ target_link_libraries( target_link_libraries( sml_test - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os @@ -61,7 +61,7 @@ target_link_libraries( target_link_libraries( get_db_name_test - PUBLIC taos_static + PUBLIC taos PUBLIC util PUBLIC common PUBLIC os