Merge pull request #29623 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch
This commit is contained in:
Shengliang Guan 2025-01-22 16:36:17 +08:00 committed by GitHub
commit 8c0600649f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 396 additions and 73 deletions

View File

@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "3.3.5.0.alpha") SET(TD_VER_NUMBER "3.3.5.2.alpha")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)

View File

@ -26,7 +26,8 @@ Flink Connector supports all platforms that can run Flink 1.19 and above version
| Flink Connector Version | Major Changes | TDengine Version| | Flink Connector Version | Major Changes | TDengine Version|
|-------------------------| ------------------------------------ | ---------------- | |-------------------------| ------------------------------------ | ---------------- |
| 2.0.0 | 1.Support SQL queries on data in TDengine database. <br/> 2. Support CDC subscription to data in TDengine database.<br/> 3. Supports reading and writing to TDengine database using Table SQL. | 3.3.5.0 and higher| | 2.0.1 | Sink supports writing types from Rowdata implementations.| - |
| 2.0.0 | 1.Support SQL queries on data in TDengine database. <br/> 2. Support CDC subscription to data in TDengine database.<br/> 3. Supports reading and writing to TDengine database using Table SQL. | 3.3.5.1 and higher|
| 1.0.0 | Support Sink function to write data from other sources to TDengine in the future.| 3.3.2.0 and higher| | 1.0.0 | Support Sink function to write data from other sources to TDengine in the future.| 3.3.2.0 and higher|
## Exception and error codes ## Exception and error codes
@ -114,7 +115,7 @@ If using Maven to manage a project, simply add the following dependencies in pom
<dependency> <dependency>
<groupId>com.taosdata.flink</groupId> <groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId> <artifactId>flink-connector-tdengine</artifactId>
<version>2.0.0</version> <version>2.0.1</version>
</dependency> </dependency>
``` ```

View File

@ -25,6 +25,10 @@ Download links for TDengine 3.x version installation packages are as follows:
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.3.5.2
<Release type="tdengine" version="3.3.5.2" />
## 3.3.5.0 ## 3.3.5.0
<Release type="tdengine" version="3.3.5.0" /> <Release type="tdengine" version="3.3.5.0" />

View File

@ -0,0 +1,43 @@
---
title: TDengine 3.3.5.2 Release Notes
sidebar_label: 3.3.5.2
description: Version 3.3.5.2 Notes
slug: /release-history/release-notes/3.3.5.2
---
## Features
1. feat: taosX now support multiple stables with template for MQTT
## Enhancements
1. enh: improve taosX error message if database is invalid
2. enh: use poetry group depencencies and reduce dep when install [#251](https://github.com/taosdata/taos-connector-python/issues/251)
3. enh: improve backup restore using taosX
4. enh: during the multi-level storage data migration, if the migration time is too long, it may cause the Vnode to switch leader
5. enh: adjust the systemctl strategy for managing the taosd process, if three consecutive restarts fail within 60 seconds, the next restart will be delayed until 900 seconds later
## Fixes
1. fix: the maxRetryWaitTime parameter is used to control the maximum reconnection timeout time for the client when the cluster is unable to provide services, but it does not take effect when encountering a Sync timeout error
2. fix: supports immediate subscription to the new tag value after modifying the tag value of the sub-table
3. fix: the tmq_consumer_poll function for data subscription does not return an error code when the call fails
4. fix: taosd may crash when more than 100 views are created and the show views command is executed
5. fix: when using stmt2 to insert data, if not all data columns are bound, the insertion operation will fail
6. fix: when using stmt2 to insert data, if the database name or table name is enclosed in backticks, the insertion operation will fail
7. fix: when closing a vnode, if there are ongoing file merge tasks, taosd may crash
8. fix: frequent execution of the “drop table with tb_uid” statement may lead to a deadlock in taosd
9. fix: the potential deadlock during the switching of log files
10. fix: prohibit the creation of databases with the same names as system databases (information_schema, performance_schema)
11. fix: when the inner query of a nested query come from a super table, the sorting information cannot be pushed up
12. fix: incorrect error reporting when attempting to write Geometry data types that do not conform to topological specifications through the STMT interface
13. fix: when using the percentile function and session window in a query statement, if an error occurs, taosd may crash
14. fix: the issue of being unable to dynamically modify system parameters
15. fix: random error of tranlict transaction in replication
16. fix: the same consumer executes the unsubscribe operation and immediately attempts to subscribe to other different topics, the subscription API will return an error
17. fix: fix CVE-2022-28948 security issue in go connector
18. fix: when a subquery in a view contains an ORDER BY clause with an alias, and the query function itself also has an alias, querying the view will result in an error
19. fix: when changing the database from a single replica to a mulit replica, if there are some metadata generated by earlier versions that are no longer used in the new version, the modification operation will fail
20. fix: column names were not correctly copied when using SELECT * FROM subqueries
21. fix: when performing max/min function on string type data, the results are inaccurate and taosd will crash
22. fix: stream computing does not support the use of the HAVING clause, but no error is reported during creation
23. fix: the version information displayed by taos shell for the server is inaccurate, such as being unable to correctly distinguish between the community edition and the enterprise edition
24. fix: in certain specific query scenarios, when JOIN and CAST are used together, taosd may crash

View File

@ -5,6 +5,7 @@ slug: /release-history/release-notes
[3.3.5.0](./3-3-5-0/) [3.3.5.0](./3-3-5-0/)
[3.3.5.2](./3.3.5.2)
[3.3.4.8](./3-3-4-8/) [3.3.4.8](./3-3-4-8/)
[3.3.4.3](./3-3-4-3/) [3.3.4.3](./3-3-4-3/)

View File

@ -263,7 +263,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location")
Class<SourceRecords<RowData>> typeClass = (Class<SourceRecords<RowData>>) (Class<?>) SourceRecords.class; Class<SourceRecords<RowData>> typeClass = (Class<SourceRecords<RowData>>) (Class<?>) SourceRecords.class;
SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters"); SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters");
TDengineSource<SourceRecords<RowData>> source = new TDengineSource<>(connProps, sql, typeClass); TDengineSource<SourceRecords<RowData>> source = new TDengineSource<>(connProps, sql, typeClass);
DataStreamSource<SourceRecords<RowData>> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source"); DataStreamSource<SourceRecords<RowData>> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<SourceRecords<RowData>, String>) records -> { DataStream<String> resultStream = input.map((MapFunction<SourceRecords<RowData>, String>) records -> {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
Iterator<RowData> iterator = records.iterator(); Iterator<RowData> iterator = records.iterator();
@ -304,7 +304,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location")
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class); TDengineCdcSource<RowData> tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class);
DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); DataStreamSource<RowData> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> { DataStream<String> resultStream = input.map((MapFunction<RowData, String>) rowData -> {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("tsxx: " + rowData.getTimestamp(0, 0) + sb.append("tsxx: " + rowData.getTimestamp(0, 0) +
@ -343,7 +343,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location")
Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class; Class<ConsumerRecords<RowData>> typeClass = (Class<ConsumerRecords<RowData>>) (Class<?>) ConsumerRecords.class;
TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); TDengineCdcSource<ConsumerRecords<RowData>> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass);
DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); DataStreamSource<ConsumerRecords<RowData>> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ConsumerRecords<RowData>, String>) records -> { DataStream<String> resultStream = input.map((MapFunction<ConsumerRecords<RowData>, String>) records -> {
Iterator<ConsumerRecord<RowData>> iterator = records.iterator(); Iterator<ConsumerRecord<RowData>> iterator = records.iterator();
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -388,7 +388,7 @@ splitSql.setSelect("ts, current, voltage, phase, groupid, location")
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer");
config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8");
TDengineCdcSource<ResultBean> tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class); TDengineCdcSource<ResultBean> tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class);
DataStreamSource<ResultBean> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); DataStreamSource<ResultBean> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source");
DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> { DataStream<String> resultStream = input.map((MapFunction<ResultBean, String>) rowData -> {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("ts: " + rowData.getTs() + sb.append("ts: " + rowData.getTs() +

View File

@ -24,7 +24,8 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。
## 版本历史 ## 版本历史
| Flink Connector 版本 | 主要变化 | TDengine 版本 | | Flink Connector 版本 | 主要变化 | TDengine 版本 |
| ------------------| ------------------------------------ | ---------------- | | ------------------| ------------------------------------ | ---------------- |
| 2.0.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据<br/> 2. 支持 CDC 订阅 TDengine 数据库中的数据<br/> 3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 | | 2.0.1 | Sink 支持对所有继承自 RowData 并已实现的类型进行数据写入| - |
| 2.0.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据<br/> 2. 支持 CDC 订阅 TDengine 数据库中的数据<br/> 3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.1 及以上版本 |
| 1.0.0 | 支持 Sink 功能,将来着其他数据源的数据写入到 TDengine| 3.3.2.0 及以上版本| | 1.0.0 | 支持 Sink 功能,将来着其他数据源的数据写入到 TDengine| 3.3.2.0 及以上版本|
## 异常和错误码 ## 异常和错误码
@ -111,7 +112,7 @@ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
<dependency> <dependency>
<groupId>com.taosdata.flink</groupId> <groupId>com.taosdata.flink</groupId>
<artifactId>flink-connector-tdengine</artifactId> <artifactId>flink-connector-tdengine</artifactId>
<version>2.0.0</version> <version>2.0.1</version>
</dependency> </dependency>
``` ```

View File

@ -24,6 +24,10 @@ TDengine 3.x 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.3.5.2
<Release type="tdengine" version="3.3.5.2" />
## 3.3.5.0 ## 3.3.5.0
<Release type="tdengine" version="3.3.5.0" /> <Release type="tdengine" version="3.3.5.0" />

View File

@ -0,0 +1,42 @@
---
title: 3.3.5.2 版本说明
sidebar_label: 3.3.5.2
description: 3.3.5.2 版本说明
---
## 特性
1. 特性taosX MQTT 数据源支持根据模板创建多个超级表
## 优化
1. 优化:改进 taosX 数据库不可用时的错误信息
2. 优化:使用 Poetry 标准管理依赖项并减少 Python 连接器安装依赖项 [#251](https://github.com/taosdata/taos-connector-python/issues/251)
3. 优化taosX 增量备份和恢复优化
4. 优化:在多级存储数据迁移过程中,如果迁移时间过长,可能会导致 Vnode 切主
5. 优化:调整 systemctl 守护 taosd 进程的策略,如果 60 秒内连续三次重启失败,下次重启将推迟至 900 秒后
## 修复
1. 修复maxRetryWaitTime 参数用于控制当集群无法提供服务时客户端的最大重连超时时间,但在遇到 Sync timeout 错误时,该参数不生效
2. 修复:支持在修改子表的 tag 值后,即时订阅到更新后的 tag 值
3. 修复:数据订阅的 tmq_consumer_poll 函数调用失败时没有返回错误码
4. 修复:当创建超过 100 个视图并执行 show views 命令时taosd 可能会发生崩溃
5. 修复:当使用 stmt2 写入数据时,如果未绑定所有的数据列,写入操作将会失败
6. 修复:当使用 stmt2 写入数据时,如果数据库名或表名使用了反引号,写入操作将会失败
7. 修复:关闭 vnode 时如果有正在进行的文件合并任务taosd 可能会崩溃
8. 修复:频繁执行 drop table with `tb_uid` 语句可能导致 taosd 死锁
9. 修复:日志文件切换过程中可能出现的死锁问题
10. 修复禁止创建与系统库information_schema, performance_schema同名的数据库
11. 修复:当嵌套查询的内层查询来源于超级表时,排序信息无法被上推
12. 修复:通过 STMT 接口尝试写入不符合拓扑规范的 Geometry 数据类型时误报错误
13. 修复:在查询语句中使用 percentile 函数和会话窗口时如果出现错误taosd 可能会崩溃
14. 修复:无法动态修改系统参数的问题
15. 修复:订阅同步偶发 Translict transaction 错误
16. 修复:同一消费者在执行取消订阅操作后,立即尝试订阅其他不同的主题时,会返回错误
17. 修复Go 连接器安全修复 CVE-2022-28948
18. 修复:当视图中的子查询包含带别名的 ORDER BY 子句,并且查询函数自身也带有别名时,查询该视图会引发错误
19. 修复:在将数据库从单副本修改为多副本时,如果存在一些由较早版本生成且在新版本中已不再使用的元数据,会导致修改操作失败
20. 修复:在使用 SELECT * FROM 子查询时,列名未能正确复制到外层查询
21. 修复:对字符串类型数据执行 max/min 函数时,结果不准确且 taosd 可能会崩溃
22. 修复:流式计算不支持使用 HAVING 语句,但在创建时未报告错误
23. 修复taos shell 显示的服务端版本信息不准确,例如无法正确区分社区版和企业版
24. 修复:在某些特定的查询场景下,当 JOIN 和 CAST 联合使用时taosd 可能会崩溃

View File

@ -4,6 +4,7 @@ sidebar_label: 版本说明
description: 各版本版本说明 description: 各版本版本说明
--- ---
[3.3.5.2](./3.3.5.2)
[3.3.5.0](./3.3.5.0) [3.3.5.0](./3.3.5.0)
[3.3.4.8](./3.3.4.8) [3.3.4.8](./3.3.4.8)
[3.3.4.3](./3.3.4.3) [3.3.4.3](./3.3.4.3)

View File

@ -34,6 +34,9 @@ extern "C" {
#define GLOBAL_CONFIG_FILE_VERSION 1 #define GLOBAL_CONFIG_FILE_VERSION 1
#define LOCAL_CONFIG_FILE_VERSION 1 #define LOCAL_CONFIG_FILE_VERSION 1
#define RPC_MEMORY_USAGE_RATIO 0.1
#define QUEUE_MEMORY_USAGE_RATIO 0.6
typedef enum { typedef enum {
DND_CA_SM4 = 1, DND_CA_SM4 = 1,
} EEncryptAlgor; } EEncryptAlgor;
@ -110,6 +113,7 @@ extern int32_t tsNumOfQnodeFetchThreads;
extern int32_t tsNumOfSnodeStreamThreads; extern int32_t tsNumOfSnodeStreamThreads;
extern int32_t tsNumOfSnodeWriteThreads; extern int32_t tsNumOfSnodeWriteThreads;
extern int64_t tsQueueMemoryAllowed; extern int64_t tsQueueMemoryAllowed;
extern int64_t tsApplyMemoryAllowed;
extern int32_t tsRetentionSpeedLimitMB; extern int32_t tsRetentionSpeedLimitMB;
extern int32_t tsNumOfCompactThreads; extern int32_t tsNumOfCompactThreads;

View File

@ -55,6 +55,7 @@ typedef struct {
typedef enum { typedef enum {
DEF_QITEM = 0, DEF_QITEM = 0,
RPC_QITEM = 1, RPC_QITEM = 1,
APPLY_QITEM = 2,
} EQItype; } EQItype;
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem); typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);

View File

@ -1776,7 +1776,9 @@ _return:
} }
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) { int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
int code = 0;
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get param num"); STMT_DLOG_E("start to get param num");
@ -1784,7 +1786,7 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return pStmt->errCode; return pStmt->errCode;
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
@ -1796,23 +1798,29 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERRI_JRET(stmtParseSql(pStmt));
} }
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
*nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues); *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
} else { } else {
STMT_ERR_RET(stmtFetchColFields(stmt, nums, NULL)); STMT_ERRI_JRET(stmtFetchColFields(stmt, nums, NULL));
} }
return TSDB_CODE_SUCCESS; _return:
pStmt->errCode = preCode;
return code;
} }
int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) { int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
int code = 0;
STscStmt* pStmt = (STscStmt*)stmt; STscStmt* pStmt = (STscStmt*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get param"); STMT_DLOG_E("start to get param");
@ -1821,10 +1829,10 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
} }
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
@ -1836,27 +1844,29 @@ int stmtGetParam(TAOS_STMT* stmt, int idx, int* type, int* bytes) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERRI_JRET(stmtParseSql(pStmt));
} }
int32_t nums = 0; int32_t nums = 0;
TAOS_FIELD_E* pField = NULL; TAOS_FIELD_E* pField = NULL;
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField)); STMT_ERRI_JRET(stmtFetchColFields(stmt, &nums, &pField));
if (idx >= nums) { if (idx >= nums) {
tscError("idx %d is too big", idx); tscError("idx %d is too big", idx);
taosMemoryFree(pField); STMT_ERRI_JRET(TSDB_CODE_INVALID_PARA);
STMT_ERR_RET(TSDB_CODE_INVALID_PARA);
} }
*type = pField[idx].type; *type = pField[idx].type;
*bytes = pField[idx].bytes; *bytes = pField[idx].bytes;
taosMemoryFree(pField); _return:
return TSDB_CODE_SUCCESS; taosMemoryFree(pField);
pStmt->errCode = preCode;
return code;
} }
TAOS_RES* stmtUseResult(TAOS_STMT* stmt) { TAOS_RES* stmtUseResult(TAOS_STMT* stmt) {

View File

@ -1074,13 +1074,16 @@ static int stmtFetchColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_E
} }
static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) { static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIELD_ALL** fields) {
int32_t code = 0;
int32_t preCode = pStmt->errCode;
if (pStmt->errCode != TSDB_CODE_SUCCESS) { if (pStmt->errCode != TSDB_CODE_SUCCESS) {
return pStmt->errCode; return pStmt->errCode;
} }
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds"); tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR); STMT_ERRI_JRET(TSDB_CODE_TSC_STMT_API_ERROR);
} }
STableDataCxt** pDataBlock = NULL; STableDataCxt** pDataBlock = NULL;
@ -1092,21 +1095,25 @@ static int stmtFetchStbColFields2(STscStmt2* pStmt, int32_t* fieldNum, TAOS_FIEL
(STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)); (STableDataCxt**)taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if (NULL == pDataBlock) { if (NULL == pDataBlock) {
tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName); tscError("table %s not found in exec blockHash", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_APP_ERROR); STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
} }
} }
STMT_ERR_RET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields)); STMT_ERRI_JRET(qBuildStmtStbColFields(*pDataBlock, pStmt->bInfo.boundTags, pStmt->bInfo.preCtbname, fieldNum, fields));
if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) { if (pStmt->bInfo.tbType == TSDB_SUPER_TABLE) {
pStmt->bInfo.needParse = true; pStmt->bInfo.needParse = true;
qDestroyStmtDataBlock(*pDataBlock); qDestroyStmtDataBlock(*pDataBlock);
if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) { if (taosHashRemove(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName)) != 0) {
tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName); tscError("get fileds %s remove exec blockHash fail", pStmt->bInfo.tbFName);
STMT_ERR_RET(TSDB_CODE_APP_ERROR); STMT_ERRI_JRET(TSDB_CODE_APP_ERROR);
} }
} }
return TSDB_CODE_SUCCESS; _return:
pStmt->errCode = preCode;
return code;
} }
/* /*
SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) { SArray* stmtGetFreeCol(STscStmt2* pStmt, int32_t* idx) {
@ -1839,7 +1846,7 @@ int stmtParseColFields2(TAOS_STMT2* stmt) {
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) { if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest); taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
} }
STMT_ERRI_JRET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
@ -1865,7 +1872,9 @@ int stmtGetStbColFields2(TAOS_STMT2* stmt, int* nums, TAOS_FIELD_ALL** fields) {
} }
int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) { int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
int32_t code = 0;
STscStmt2* pStmt = (STscStmt2*)stmt; STscStmt2* pStmt = (STscStmt2*)stmt;
int32_t preCode = pStmt->errCode;
STMT_DLOG_E("start to get param num"); STMT_DLOG_E("start to get param num");
@ -1873,7 +1882,7 @@ int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
return pStmt->errCode; return pStmt->errCode;
} }
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS)); STMT_ERRI_JRET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 && if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) { STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
@ -1885,19 +1894,23 @@ int stmtGetParamNum2(TAOS_STMT2* stmt, int* nums) {
pStmt->exec.pRequest = NULL; pStmt->exec.pRequest = NULL;
} }
STMT_ERR_RET(stmtCreateRequest(pStmt)); STMT_ERRI_JRET(stmtCreateRequest(pStmt));
if (pStmt->bInfo.needParse) { if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt)); STMT_ERRI_JRET(stmtParseSql(pStmt));
} }
if (STMT_TYPE_QUERY == pStmt->sql.type) { if (STMT_TYPE_QUERY == pStmt->sql.type) {
*nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues); *nums = taosArrayGetSize(pStmt->sql.pQuery->pPlaceholderValues);
} else { } else {
STMT_ERR_RET(stmtFetchColFields2(stmt, nums, NULL)); STMT_ERRI_JRET(stmtFetchColFields2(stmt, nums, NULL));
} }
return TSDB_CODE_SUCCESS; _return:
pStmt->errCode = preCode;
return code;
} }
TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) { TAOS_RES* stmtUseResult2(TAOS_STMT2* stmt) {

View File

@ -735,7 +735,7 @@ TEST(stmt2Case, insert_ntb_get_fields_Test) {
{ {
const char* sql = "insert into stmt2_testdb_4.? values(?,?)"; const char* sql = "insert into stmt2_testdb_4.? values(?,?)";
printf("case 2 : %s\n", sql); printf("case 2 : %s\n", sql);
getFieldsError(taos, sql, TSDB_CODE_PAR_TABLE_NOT_EXIST); getFieldsError(taos, sql, TSDB_CODE_TSC_STMT_TBNAME_ERROR);
} }
// case 3 : wrong para nums // case 3 : wrong para nums
@ -1496,8 +1496,51 @@ TEST(stmt2Case, geometry) {
checkError(stmt, code); checkError(stmt, code);
ASSERT_EQ(affected_rows, 3); ASSERT_EQ(affected_rows, 3);
// test wrong wkb input
unsigned char wkb2[3][61] = {
{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xF0, 0x3F, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
},
{0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f},
{0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40}};
params[1].buffer = wkb2;
code = taos_stmt2_bind_param(stmt, &bindv, -1);
ASSERT_EQ(code, TSDB_CODE_FUNC_FUNTION_PARA_VALUE);
taos_stmt2_close(stmt); taos_stmt2_close(stmt);
do_query(taos, "DROP DATABASE IF EXISTS stmt2_testdb_13"); do_query(taos, "DROP DATABASE IF EXISTS stmt2_testdb_13");
taos_close(taos); taos_close(taos);
} }
// TD-33582
TEST(stmt2Case, errcode) {
TAOS* taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
do_query(taos, "DROP DATABASE IF EXISTS stmt2_testdb_14");
do_query(taos, "CREATE DATABASE IF NOT EXISTS stmt2_testdb_14");
do_query(taos, "use stmt2_testdb_14");
TAOS_STMT2_OPTION option = {0};
TAOS_STMT2* stmt = taos_stmt2_init(taos, &option);
ASSERT_NE(stmt, nullptr);
char* sql = "select * from t where ts > ? and name = ? foo = ?";
int code = taos_stmt2_prepare(stmt, sql, 0);
checkError(stmt, code);
int fieldNum = 0;
TAOS_FIELD_ALL* pFields = NULL;
code = taos_stmt2_get_fields(stmt, &fieldNum, &pFields);
ASSERT_EQ(code, TSDB_CODE_PAR_SYNTAX_ERROR);
// get fail dont influence the next stmt prepare
sql = "nsert into ? (ts, name) values (?, ?)";
code = taos_stmt_prepare(stmt, sql, 0);
checkError(stmt, code);
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop

View File

@ -212,15 +212,6 @@ void insertData(TAOS *taos, TAOS_STMT_OPTIONS *option, const char *sql, int CTB_
void getFields(TAOS *taos, const char *sql, int expectedALLFieldNum, TAOS_FIELD_E *expectedTagFields, void getFields(TAOS *taos, const char *sql, int expectedALLFieldNum, TAOS_FIELD_E *expectedTagFields,
int expectedTagFieldNum, TAOS_FIELD_E *expectedColFields, int expectedColFieldNum) { int expectedTagFieldNum, TAOS_FIELD_E *expectedColFields, int expectedColFieldNum) {
// create database and table
do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_3");
do_query(taos, "CREATE DATABASE IF NOT EXISTS stmt_testdb_3");
do_query(taos, "USE stmt_testdb_3");
do_query(
taos,
"CREATE STABLE IF NOT EXISTS stmt_testdb_3.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
TAOS_STMT *stmt = taos_stmt_init(taos); TAOS_STMT *stmt = taos_stmt_init(taos);
ASSERT_NE(stmt, nullptr); ASSERT_NE(stmt, nullptr);
int code = taos_stmt_prepare(stmt, sql, 0); int code = taos_stmt_prepare(stmt, sql, 0);
@ -267,6 +258,24 @@ void getFields(TAOS *taos, const char *sql, int expectedALLFieldNum, TAOS_FIELD_
taos_stmt_close(stmt); taos_stmt_close(stmt);
} }
void getFieldsError(TAOS *taos, const char *sql, int expectedErrocode) {
TAOS_STMT *stmt = taos_stmt_init(taos);
ASSERT_NE(stmt, nullptr);
STscStmt *pStmt = (STscStmt *)stmt;
int code = taos_stmt_prepare(stmt, sql, 0);
int fieldNum = 0;
TAOS_FIELD_E *pFields = NULL;
code = taos_stmt_get_tag_fields(stmt, &fieldNum, &pFields);
ASSERT_EQ(code, expectedErrocode);
ASSERT_EQ(pStmt->errCode, TSDB_CODE_SUCCESS);
taosMemoryFree(pFields);
taos_stmt_close(stmt);
}
} // namespace } // namespace
int main(int argc, char **argv) { int main(int argc, char **argv) {
@ -298,6 +307,15 @@ TEST(stmtCase, get_fields) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
// create database and table
do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_3");
do_query(taos, "CREATE DATABASE IF NOT EXISTS stmt_testdb_3");
do_query(taos, "USE stmt_testdb_3");
do_query(
taos,
"CREATE STABLE IF NOT EXISTS stmt_testdb_3.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
// nomarl test
{ {
TAOS_FIELD_E tagFields[2] = {{"groupid", TSDB_DATA_TYPE_INT, 0, 0, sizeof(int)}, TAOS_FIELD_E tagFields[2] = {{"groupid", TSDB_DATA_TYPE_INT, 0, 0, sizeof(int)},
{"location", TSDB_DATA_TYPE_BINARY, 0, 0, 24}}; {"location", TSDB_DATA_TYPE_BINARY, 0, 0, 24}};
@ -307,6 +325,12 @@ TEST(stmtCase, get_fields) {
{"phase", TSDB_DATA_TYPE_FLOAT, 0, 0, sizeof(float)}}; {"phase", TSDB_DATA_TYPE_FLOAT, 0, 0, sizeof(float)}};
getFields(taos, "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)", 7, &tagFields[0], 2, &colFields[0], 4); getFields(taos, "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)", 7, &tagFields[0], 2, &colFields[0], 4);
} }
// error case [TD-33570]
{ getFieldsError(taos, "INSERT INTO ? VALUES (?,?,?,?)", TSDB_CODE_TSC_STMT_TBNAME_ERROR); }
{ getFieldsError(taos, "INSERT INTO ? USING meters TAGS(?,?) VALUES (?,?,?,?)", TSDB_CODE_TSC_STMT_TBNAME_ERROR); }
do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_3"); do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_3");
taos_close(taos); taos_close(taos);
} }
@ -520,9 +544,6 @@ TEST(stmtCase, geometry) {
int code = taos_stmt_prepare(stmt, stmt_sql, 0); int code = taos_stmt_prepare(stmt, stmt_sql, 0);
checkError(stmt, code); checkError(stmt, code);
// code = taos_stmt_set_tbname(stmt, "tb1");
// checkError(stmt, code);
code = taos_stmt_bind_param_batch(stmt, params); code = taos_stmt_bind_param_batch(stmt, params);
checkError(stmt, code); checkError(stmt, code);
@ -532,11 +553,58 @@ TEST(stmtCase, geometry) {
code = taos_stmt_execute(stmt); code = taos_stmt_execute(stmt);
checkError(stmt, code); checkError(stmt, code);
//test wrong wkb input
unsigned char wkb2[3][61] = {
{
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0xF0, 0x3F, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40,
},
{0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f},
{0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40}};
params[1].buffer = wkb2;
code = taos_stmt_bind_param_batch(stmt, params);
ASSERT_EQ(code, TSDB_CODE_FUNC_FUNTION_PARA_VALUE);
taosMemoryFree(t64_len); taosMemoryFree(t64_len);
taosMemoryFree(wkb_len); taosMemoryFree(wkb_len);
taos_stmt_close(stmt); taos_stmt_close(stmt);
do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_5"); do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_5");
taos_close(taos); taos_close(taos);
} }
//TD-33582
TEST(stmtCase, errcode) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr);
do_query(taos, "DROP DATABASE IF EXISTS stmt_testdb_4");
do_query(taos, "CREATE DATABASE IF NOT EXISTS stmt_testdb_4");
do_query(taos, "USE stmt_testdb_4");
do_query(
taos,
"CREATE STABLE IF NOT EXISTS stmt_testdb_4.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
TAOS_STMT *stmt = taos_stmt_init(taos);
ASSERT_NE(stmt, nullptr);
char *sql = "select * from t where ts > ? and name = ? foo = ?";
int code = taos_stmt_prepare(stmt, sql, 0);
checkError(stmt, code);
int fieldNum = 0;
TAOS_FIELD_E *pFields = NULL;
code = stmtGetParamNum(stmt, &fieldNum);
ASSERT_EQ(code, TSDB_CODE_PAR_SYNTAX_ERROR);
code = taos_stmt_get_tag_fields(stmt, &fieldNum, &pFields);
ASSERT_EQ(code, TSDB_CODE_PAR_SYNTAX_ERROR);
// get fail dont influence the next stmt prepare
sql = "nsert into ? (ts, name) values (?, ?)";
code = taos_stmt_prepare(stmt, sql, 0);
checkError(stmt, code);
}
#pragma GCC diagnostic pop #pragma GCC diagnostic pop

View File

@ -500,7 +500,9 @@ int32_t taosSetS3Cfg(SConfig *pCfg) {
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
struct SConfig *taosGetCfg() { return tsCfg; } struct SConfig *taosGetCfg() {
return tsCfg;
}
static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile, static int32_t taosLoadCfg(SConfig *pCfg, const char **envCmd, const char *inputCfgDir, const char *envFile,
char *apolloUrl) { char *apolloUrl) {
@ -818,8 +820,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeWriteThreads = tsNumOfCores / 4; tsNumOfSnodeWriteThreads = tsNumOfCores / 4;
tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4); tsNumOfSnodeWriteThreads = TRANGE(tsNumOfSnodeWriteThreads, 2, 4);
tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsQueueMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * QUEUE_MEMORY_USAGE_RATIO;
tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); tsQueueMemoryAllowed = TRANGE(tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10LL,
TSDB_MAX_MSG_SIZE * QUEUE_MEMORY_USAGE_RATIO * 10000LL);
tsApplyMemoryAllowed = tsTotalMemoryKB * 1024 * RPC_MEMORY_USAGE_RATIO * (1 - QUEUE_MEMORY_USAGE_RATIO);
tsApplyMemoryAllowed = TRANGE(tsApplyMemoryAllowed, TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10LL,
TSDB_MAX_MSG_SIZE * (1 - QUEUE_MEMORY_USAGE_RATIO) * 10000LL);
tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1; tsLogBufferMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL); tsLogBufferMemoryAllowed = TRANGE(tsLogBufferMemoryAllowed, TSDB_MAX_MSG_SIZE * 10LL, TSDB_MAX_MSG_SIZE * 10000LL);
@ -857,7 +864,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeStreamThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeWriteThreads, 2, 1024, CFG_SCOPE_SERVER, CFG_DYN_SERVER_LAZY,CFG_CATEGORY_LOCAL));
TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt64(pCfg, "rpcQueueMemoryAllowed", tsQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * RPC_MEMORY_USAGE_RATIO * 10L, INT64_MAX, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncElectInterval", tsElectInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL));
@ -1569,7 +1576,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfSnodeWriteThreads = pItem->i32; tsNumOfSnodeWriteThreads = pItem->i32;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed"); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "rpcQueueMemoryAllowed");
tsQueueMemoryAllowed = pItem->i64; tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO;
tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO);
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable"); TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "simdEnable");
tsSIMDEnable = (bool)pItem->bval; tsSIMDEnable = (bool)pItem->bval;
@ -2392,6 +2400,12 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
goto _exit; goto _exit;
} }
if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * QUEUE_MEMORY_USAGE_RATIO;
tsApplyMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64 * (1 - QUEUE_MEMORY_USAGE_RATIO);
code = TSDB_CODE_SUCCESS;
goto _exit;
}
if (strcasecmp(name, "numOfCompactThreads") == 0) { if (strcasecmp(name, "numOfCompactThreads") == 0) {
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
@ -2497,7 +2511,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"experimental", &tsExperimental}, {"experimental", &tsExperimental},
{"numOfRpcSessions", &tsNumOfRpcSessions}, {"numOfRpcSessions", &tsNumOfRpcSessions},
{"rpcQueueMemoryAllowed", &tsQueueMemoryAllowed},
{"shellActivityTimer", &tsShellActivityTimer}, {"shellActivityTimer", &tsShellActivityTimer},
{"readTimeout", &tsReadTimeout}, {"readTimeout", &tsReadTimeout},
{"safetyCheckLevel", &tsSafetyCheckLevel}, {"safetyCheckLevel", &tsSafetyCheckLevel},

View File

@ -181,7 +181,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
req.numOfSupportVnodes = tsNumOfSupportVnodes; req.numOfSupportVnodes = tsNumOfSupportVnodes;
req.numOfDiskCfg = tsDiskCfgNum; req.numOfDiskCfg = tsDiskCfgNum;
req.memTotal = tsTotalMemoryKB * 1024; req.memTotal = tsTotalMemoryKB * 1024;
req.memAvail = req.memTotal - tsQueueMemoryAllowed - 16 * 1024 * 1024; req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1); tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);

View File

@ -323,7 +323,7 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
EQItype itype = APPLY_QUEUE == qtype ? DEF_QITEM : RPC_QITEM; EQItype itype = APPLY_QUEUE == qtype ? APPLY_QITEM : RPC_QITEM;
SRpcMsg *pMsg; SRpcMsg *pMsg;
code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg); code = taosAllocateQitem(sizeof(SRpcMsg), itype, pRpc->contLen, (void **)&pMsg);
if (code) { if (code) {

View File

@ -36,7 +36,8 @@ void Testbase::InitLog(const char* path) {
tstrncpy(tsLogDir, path, PATH_MAX); tstrncpy(tsLogDir, path, PATH_MAX);
taosGetSystemInfo(); taosGetSystemInfo();
tsQueueMemoryAllowed = tsTotalMemoryKB * 0.1; tsQueueMemoryAllowed = tsTotalMemoryKB * 0.06;
tsApplyMemoryAllowed = tsTotalMemoryKB * 0.04;
if (taosInitLog("taosdlog", 1, false) != 0) { if (taosInitLog("taosdlog", 1, false) != 0) {
printf("failed to init log file\n"); printf("failed to init log file\n");
} }

View File

@ -257,6 +257,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver); int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq); int32_t tqScanWal(STQ* pTq);
// injection error
void streamMetaFreeTQDuringScanWalError(STQ* pTq);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
// tq-mq // tq-mq

View File

@ -75,12 +75,14 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {
if (pTq == NULL) { if (pTq == NULL) {
return terrno; return terrno;
} }
pVnode->pTq = pTq; pVnode->pTq = pTq;
pTq->pVnode = pVnode;
pTq->path = taosStrdup(path); pTq->path = taosStrdup(path);
if (pTq->path == NULL) { if (pTq->path == NULL) {
return terrno; return terrno;
} }
pTq->pVnode = pVnode;
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (pTq->pHandle == NULL) { if (pTq->pHandle == NULL) {
@ -131,11 +133,19 @@ void tqClose(STQ* pTq) {
return; return;
} }
int32_t vgId = 0;
if (pTq->pVnode != NULL) {
vgId = TD_VID(pTq->pVnode);
} else if (pTq->pStreamMeta != NULL) {
vgId = pTq->pStreamMeta->vgId;
}
// close the stream meta firstly
streamMetaClose(pTq->pStreamMeta);
void* pIter = taosHashIterate(pTq->pPushMgr, NULL); void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) { while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter; STqHandle* pHandle = *(STqHandle**)pIter;
int32_t vgId = TD_VID(pTq->pVnode);
if (pHandle->msg != NULL) { if (pHandle->msg != NULL) {
tqPushEmptyDataRsp(pHandle, vgId); tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont); rpcFreeCont(pHandle->msg->pCont);
@ -151,8 +161,12 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->pOffset); taosHashCleanup(pTq->pOffset);
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1); qDebug("vgId:%d end to close tq", vgId);
streamMetaClose(pTq->pStreamMeta);
#if 0
streamMetaFreeTQDuringScanWalError(pTq);
#endif
taosMemoryFree(pTq); taosMemoryFree(pTq);
} }

View File

@ -22,6 +22,8 @@
typedef struct SBuildScanWalMsgParam { typedef struct SBuildScanWalMsgParam {
int64_t metaId; int64_t metaId;
int32_t numOfTasks; int32_t numOfTasks;
int8_t restored;
SMsgCb msgCb;
} SBuildScanWalMsgParam; } SBuildScanWalMsgParam;
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
@ -74,7 +76,6 @@ int32_t tqScanWal(STQ* pTq) {
static void doStartScanWal(void* param, void* tmrId) { static void doStartScanWal(void* param, void* tmrId) {
int32_t vgId = 0; int32_t vgId = 0;
STQ* pTq = NULL;
int32_t code = 0; int32_t code = 0;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
@ -86,13 +87,29 @@ static void doStartScanWal(void* param, void* tmrId) {
return; return;
} }
if (pMeta->closeFlag) {
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
} else {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
taosMemoryFree(pParam);
return;
}
vgId = pMeta->vgId; vgId = pMeta->vgId;
pTq = pMeta->ahandle;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks, tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored); pParam->restored);
#if 0
// wait for the vnode is freed, and invalid read may occur.
taosMsleep(10000);
#endif
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) { if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
} }
@ -120,6 +137,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
pParam->metaId = pMeta->rid; pParam->metaId = pMeta->rid;
pParam->numOfTasks = numOfTasks; pParam->numOfTasks = numOfTasks;
pParam->restored = pTq->pVnode->restored;
pParam->msgCb = pTq->pVnode->msgCb;
code = streamTimerGetInstance(&pTimer); code = streamTimerGetInstance(&pTimer);
if (code) { if (code) {
@ -330,13 +349,13 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
int32_t vgId = pStreamMeta->vgId; int32_t vgId = pStreamMeta->vgId;
SArray* pTaskList = NULL;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
if (numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// clone the task list, to avoid the task update during scan wal files // clone the task list, to avoid the task update during scan wal files
SArray* pTaskList = NULL;
streamMetaWLock(pStreamMeta); streamMetaWLock(pStreamMeta);
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
streamMetaWUnLock(pStreamMeta); streamMetaWUnLock(pStreamMeta);
@ -447,3 +466,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
} }
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
p->metaId = pTq->pStreamMeta->rid;
p->numOfTasks = 0;
doStartScanWal(p, 0);
}

View File

@ -784,6 +784,7 @@ static bool funcNotSupportStringSma(SFunctionNode* pFunc) {
case FUNCTION_TYPE_SPREAD_PARTIAL: case FUNCTION_TYPE_SPREAD_PARTIAL:
case FUNCTION_TYPE_SPREAD_MERGE: case FUNCTION_TYPE_SPREAD_MERGE:
case FUNCTION_TYPE_TWA: case FUNCTION_TYPE_TWA:
case FUNCTION_TYPE_ELAPSED:
pParam = nodesListGetNode(pFunc->pParameterList, 0); pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (pParam && nodesIsExprNode(pParam) && (IS_VAR_DATA_TYPE(((SExprNode*)pParam)->resType.type))) { if (pParam && nodesIsExprNode(pParam) && (IS_VAR_DATA_TYPE(((SExprNode*)pParam)->resType.type))) {
return true; return true;

View File

@ -2751,6 +2751,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
if (TSDB_CODE_SUCCESS == code && hasData) { if (TSDB_CODE_SUCCESS == code && hasData) {
code = parseInsertTableClause(pCxt, pStmt, &token); code = parseInsertTableClause(pCxt, pStmt, &token);
} }
if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code && pCxt->preCtbname) {
code = TSDB_CODE_TSC_STMT_TBNAME_ERROR;
}
} }
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {

View File

@ -14,4 +14,4 @@ void chkptFailedByRetrieveReqToSource(SStreamTask* pTask, int64_t checkpointId)
// the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode // the checkpoint interval should be 60s, and the next checkpoint req should be issued by mnode
taosMsleep(65*1000); taosMsleep(65*1000);
} }

View File

@ -331,7 +331,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
} else { } else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid); stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
} }
// taosMemoryFree(param);
return; return;
} }

View File

@ -577,6 +577,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pMeta == NULL) { if (pMeta == NULL) {
return; return;
} }
int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid); int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
if (code) { if (code) {
stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code)); stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));

View File

@ -732,7 +732,11 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry); pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry);
if (retry) { if (retry) {
taosMsleep(10); taosMsleep(10);
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index); if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) {
sError("vgId:%d, failed to execute fsm since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
} else {
sDebug("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
}
} }
} while (retry); } while (retry);

View File

@ -14,14 +14,16 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tqueue.h"
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#include "tqueue.h"
#include "tutil.h" #include "tutil.h"
int64_t tsQueueMemoryAllowed = 0; int64_t tsQueueMemoryAllowed = 0;
int64_t tsQueueMemoryUsed = 0; int64_t tsQueueMemoryUsed = 0;
int64_t tsApplyMemoryAllowed = 0;
int64_t tsApplyMemoryUsed = 0;
struct STaosQueue { struct STaosQueue {
STaosQnode *head; STaosQnode *head;
STaosQnode *tail; STaosQnode *tail;
@ -148,20 +150,34 @@ int64_t taosQueueMemorySize(STaosQueue *queue) {
} }
int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) { int32_t taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize, void **item) {
int64_t alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize); int64_t alloced = -1;
if (alloced > tsQueueMemoryAllowed) { if (alloced > tsQueueMemoryAllowed) {
alloced = atomic_add_fetch_64(&tsQueueMemoryUsed, size + dataSize);
if (itype == RPC_QITEM) { if (itype == RPC_QITEM) {
uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced, uError("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsQueueMemoryAllowed); tsQueueMemoryAllowed);
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); (void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
} }
} else if (itype == APPLY_QITEM) {
alloced = atomic_add_fetch_64(&tsApplyMemoryUsed, size + dataSize);
if (alloced > tsApplyMemoryAllowed) {
uDebug("failed to alloc qitem, size:%" PRId64 " alloc:%" PRId64 " allowed:%" PRId64, size + dataSize, alloced,
tsApplyMemoryAllowed);
(void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
return (terrno = TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
}
} }
*item = NULL; *item = NULL;
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size); STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
if (pNode == NULL) { if (pNode == NULL) {
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize); if (itype == RPC_QITEM) {
(void)atomic_sub_fetch_64(&tsQueueMemoryUsed, size + dataSize);
} else if (itype == APPLY_QITEM) {
(void)atomic_sub_fetch_64(&tsApplyMemoryUsed, size + dataSize);
}
return terrno; return terrno;
} }
@ -178,7 +194,12 @@ void taosFreeQitem(void *pItem) {
if (pItem == NULL) return; if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
int64_t alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize); int64_t alloced = -1;
if (pNode->itype == RPC_QITEM) {
alloced = atomic_sub_fetch_64(&tsQueueMemoryUsed, pNode->size + pNode->dataSize);
} else if (pNode->itype == APPLY_QITEM) {
alloced = atomic_sub_fetch_64(&tsApplyMemoryUsed, pNode->size + pNode->dataSize);
}
uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced); uTrace("item:%p, node:%p is freed, alloc:%" PRId64, pItem, pNode, alloced);
taosMemoryFree(pNode); taosMemoryFree(pNode);