diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx
index 92db7d4cbf..c85109d3c5 100644
--- a/docs/en/07-develop/07-tmq.mdx
+++ b/docs/en/07-develop/07-tmq.mdx
@@ -222,7 +222,7 @@ A database including one supertable and two subtables is created as follows:
```sql
DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb;
-CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
+CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16)) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00');
diff --git a/docs/en/07-develop/_sub_java.mdx b/docs/en/07-develop/_sub_java.mdx
index d14b5fd609..a928fa8836 100644
--- a/docs/en/07-develop/_sub_java.mdx
+++ b/docs/en/07-develop/_sub_java.mdx
@@ -1,11 +1,24 @@
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
-{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
-{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
-```
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
-```
-```java
-{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
-```
\ No newline at end of file
+
+
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+ ```
+
+
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+ ```
+ ```java
+ {{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+ ```
+
+
diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx
index 61ce166069..36992da636 100644
--- a/docs/en/14-reference/03-connector/04-java.mdx
+++ b/docs/en/14-reference/03-connector/04-java.mdx
@@ -696,6 +696,9 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- enable.auto.commit: Specifies whether to commit automatically.
- group.id: consumer: Specifies the group that the consumer is in.
- value.deserializer: To deserialize the results, you can inherit `com.taosdata.jdbc.tmq.ReferenceDeserializer` and specify the result set bean. You can also inherit `com.taosdata.jdbc.tmq.Deserializer` and perform custom deserialization based on the SQL result set.
+- td.connect.type: Specifies the type connect with TDengine, `jni` or `WebSocket`. default is `jni`
+- httpConnectTimeout:WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
+- messageWaitTimeout:socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
- For more information, see [Consumer Parameters](../../../develop/tmq).
#### Subscribe to consume data
@@ -724,6 +727,11 @@ For more information, see [Data Subscription](../../../develop/tmq).
### Usage examples
+
+
+
+In addition to the native connection, the Java Connector also supports subscribing via websocket.
+
```java
public abstract class ConsumerLoop {
private final TaosConsumer consumer;
@@ -795,6 +803,87 @@ public abstract class ConsumerLoop {
}
```
+
+
+
+```java
+public abstract class ConsumerLoop {
+ private final TaosConsumer consumer;
+ private final List topics;
+ private final AtomicBoolean shutdown;
+ private final CountDownLatch shutdownLatch;
+
+ public ConsumerLoop() throws SQLException {
+ Properties config = new Properties();
+ config.setProperty("bootstrap.servers", "localhost:6041");
+ config.setProperty("td.connect.type", "ws");
+ config.setProperty("msg.with.table.name", "true");
+ config.setProperty("enable.auto.commit", "true");
+ config.setProperty("group.id", "group2");
+ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
+
+ this.consumer = new TaosConsumer<>(config);
+ this.topics = Collections.singletonList("topic_speed");
+ this.shutdown = new AtomicBoolean(false);
+ this.shutdownLatch = new CountDownLatch(1);
+ }
+
+ public abstract void process(ResultBean result);
+
+ public void pollData() throws SQLException {
+ try {
+ consumer.subscribe(topics);
+
+ while (!shutdown.get()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ResultBean record : records) {
+ process(record);
+ }
+ }
+ consumer.unsubscribe();
+ } finally {
+ consumer.close();
+ shutdownLatch.countDown();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ shutdownLatch.await();
+ }
+
+ public static class ResultDeserializer extends ReferenceDeserializer {
+
+ }
+
+ public static class ResultBean {
+ private Timestamp ts;
+ private int speed;
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public int getSpeed() {
+ return speed;
+ }
+
+ public void setSpeed(int speed) {
+ this.speed = speed;
+ }
+ }
+}
+```
+
+
+
+
+> **Note**: The value of value.deserializer should be adjusted based on the package path of the test environment.
+
### Use with connection pool
#### HikariCP
@@ -878,8 +967,8 @@ The source code of the sample application is under `TDengine/examples/JDBC`:
| taos-jdbcdriver version | major changes |
| :---------------------: | :--------------------------------------------: |
-| 3.0.3 | fix timestamp resolution error for REST connection in jdk17+ version |
-| 3.0.1 - 3.0.2 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use 3.0.2 in the JDK 8 environment |
+| 3.1.0 | JDBC REST connection supports subscription over WebSocket |
+| 3.0.1 - 3.0.4 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use other version in the JDK 8 environment |
| 3.0.0 | Support for TDengine 3.0 |
| 2.0.42 | fix wasNull interface return value in WebSocket connection |
| 2.0.41 | fix decode method of username and password in REST connection |
diff --git a/docs/examples/java/pom.xml b/docs/examples/java/pom.xml
index 634c3f75a8..01d0ce936c 100644
--- a/docs/examples/java/pom.xml
+++ b/docs/examples/java/pom.xml
@@ -1,6 +1,7 @@
-
4.0.0
@@ -17,13 +18,13 @@
-
+
com.taosdata.jdbc
taos-jdbcdriver
- 3.0.0
+ 3.1.0
-
+
junit
junit
@@ -32,4 +33,4 @@
-
+
\ No newline at end of file
diff --git a/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java
new file mode 100644
index 0000000000..d953a73641
--- /dev/null
+++ b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java
@@ -0,0 +1,79 @@
+package com.taos.example;
+
+import com.taosdata.jdbc.tmq.ConsumerRecords;
+import com.taosdata.jdbc.tmq.TMQConstants;
+import com.taosdata.jdbc.tmq.TaosConsumer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class WebsocketSubscribeDemo {
+ private static final String TOPIC = "tmq_topic_ws";
+ private static final String DB_NAME = "meters_ws";
+ private static final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ public static void main(String[] args) {
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+ public void run() {
+ shutdown.set(true);
+ }
+ }, 3_000);
+ try {
+ // prepare
+ Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
+ String jdbcUrl = "jdbc:TAOS-RS://127.0.0.1:6041/?user=root&password=taosdata&batchfetch=true";
+ try (Connection connection = DriverManager.getConnection(jdbcUrl);
+ Statement statement = connection.createStatement()) {
+ statement.executeUpdate("drop topic if exists " + TOPIC);
+ statement.executeUpdate("drop database if exists " + DB_NAME);
+ statement.executeUpdate("create database " + DB_NAME);
+ statement.executeUpdate("use " + DB_NAME);
+ statement.executeUpdate(
+ "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT) TAGS (`groupid` INT, `location` BINARY(24))");
+ statement.executeUpdate("CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')");
+ statement.executeUpdate("INSERT INTO `d0` values(now - 10s, 0.32, 116)");
+ statement.executeUpdate("INSERT INTO `d0` values(now - 8s, NULL, NULL)");
+ statement.executeUpdate(
+ "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119)");
+ statement.executeUpdate(
+ "INSERT INTO `d1` values (now-8s, 10, 120) (now - 6s, 10, 119) (now - 4s, 11.2, 118)");
+ // create topic
+ statement.executeUpdate("create topic " + TOPIC + " as select * from meters");
+ }
+
+ // create consumer
+ Properties properties = new Properties();
+ properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
+ properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
+ properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
+ properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
+ properties.setProperty(TMQConstants.GROUP_ID, "test");
+ properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
+ "com.taos.example.MetersDeserializer");
+
+ // poll data
+ try (TaosConsumer consumer = new TaosConsumer<>(properties)) {
+ consumer.subscribe(Collections.singletonList(TOPIC));
+ while (!shutdown.get()) {
+ ConsumerRecords meters = consumer.poll(Duration.ofMillis(100));
+ for (Meters meter : meters) {
+ System.out.println(meter);
+ }
+ }
+ consumer.unsubscribe();
+ }
+ } catch (ClassNotFoundException | SQLException e) {
+ e.printStackTrace();
+ }
+ timer.cancel();
+ }
+}
diff --git a/docs/examples/java/src/test/java/com/taos/test/TestAll.java b/docs/examples/java/src/test/java/com/taos/test/TestAll.java
index 8d201da074..f24156d8b1 100644
--- a/docs/examples/java/src/test/java/com/taos/test/TestAll.java
+++ b/docs/examples/java/src/test/java/com/taos/test/TestAll.java
@@ -64,21 +64,15 @@ public class TestAll {
@Test
public void testSubscribe() {
-
- Thread thread = new Thread(() -> {
- try {
- Thread.sleep(1000);
- insertData();
- } catch (SQLException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- thread.start();
SubscribeDemo.main(args);
}
+
+ @Test
+ public void testSubscribeOverWebsocket() {
+ WebsocketSubscribeDemo.main(args);
+ }
+
@Test
public void testSchemaless() throws SQLException {
LineProtocolExample.main(args);
diff --git a/docs/zh/07-develop/_sub_java.mdx b/docs/zh/07-develop/_sub_java.mdx
index e7de158cc8..e5e46b2af0 100644
--- a/docs/zh/07-develop/_sub_java.mdx
+++ b/docs/zh/07-develop/_sub_java.mdx
@@ -1,3 +1,5 @@
+
+
```java
{{#include docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java}}
```
@@ -6,4 +8,17 @@
```
```java
{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
-```
\ No newline at end of file
+```
+
+
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java}}
+```
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/MetersDeserializer.java}}
+```
+```java
+{{#include docs/examples/java/src/main/java/com/taos/example/Meters.java}}
+```
+
+
diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx
index fc6dc57138..061475f51e 100644
--- a/docs/zh/08-connector/14-java.mdx
+++ b/docs/zh/08-connector/14-java.mdx
@@ -699,7 +699,10 @@ TaosConsumer consumer = new TaosConsumer<>(config);
- enable.auto.commit: 是否允许自动提交。
- group.id: consumer: 所在的 group。
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
-- 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
+- td.connect.type: 连接方式。jni:表示使用动态库连接的方式,ws/WebSocket:表示使用 WebSocket 进行数据通信。默认为 jni 方式。
+- httpConnectTimeout:创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
+- messageWaitTimeout:数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
+其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group)
#### 订阅消费数据
@@ -727,6 +730,9 @@ consumer.close()
### 使用示例如下:
+
+
+
```java
public abstract class ConsumerLoop {
private final TaosConsumer consumer;
@@ -798,6 +804,89 @@ public abstract class ConsumerLoop {
}
```
+
+
+
+除了原生的连接方式,Java 连接器还支持通过 WebSocket 订阅数据。
+
+```java
+public abstract class ConsumerLoop {
+ private final TaosConsumer consumer;
+ private final List topics;
+ private final AtomicBoolean shutdown;
+ private final CountDownLatch shutdownLatch;
+
+ public ConsumerLoop() throws SQLException {
+ Properties config = new Properties();
+ config.setProperty("bootstrap.servers", "localhost:6041");
+ config.setProperty("td.connect.type", "ws");
+ config.setProperty("msg.with.table.name", "true");
+ config.setProperty("enable.auto.commit", "true");
+ config.setProperty("group.id", "group2");
+ config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
+
+ this.consumer = new TaosConsumer<>(config);
+ this.topics = Collections.singletonList("topic_speed");
+ this.shutdown = new AtomicBoolean(false);
+ this.shutdownLatch = new CountDownLatch(1);
+ }
+
+ public abstract void process(ResultBean result);
+
+ public void pollData() throws SQLException {
+ try {
+ consumer.subscribe(topics);
+
+ while (!shutdown.get()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ResultBean record : records) {
+ process(record);
+ }
+ }
+ consumer.unsubscribe();
+ } finally {
+ consumer.close();
+ shutdownLatch.countDown();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ shutdown.set(true);
+ shutdownLatch.await();
+ }
+
+ public static class ResultDeserializer extends ReferenceDeserializer {
+
+ }
+
+ public static class ResultBean {
+ private Timestamp ts;
+ private int speed;
+
+ public Timestamp getTs() {
+ return ts;
+ }
+
+ public void setTs(Timestamp ts) {
+ this.ts = ts;
+ }
+
+ public int getSpeed() {
+ return speed;
+ }
+
+ public void setSpeed(int speed) {
+ this.speed = speed;
+ }
+ }
+}
+```
+
+
+
+
+> **注意**:这里的 value.deserializer 配置参数值应该根据测试环境的包路径做相应的调整。
+
### 与连接池使用
#### HikariCP
@@ -881,8 +970,8 @@ public static void main(String[] args) throws Exception {
| taos-jdbcdriver 版本 | 主要变化 |
| :------------------: | :----------------------------: |
-| 3.0.3 | 修复 REST 连接在 jdk17+ 版本时间戳解析错误问题 |
-| 3.0.1 - 3.0.2 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用 3.0.2 版本 |
+| 3.1.0 | WebSocket 连接支持订阅功能 |
+| 3.0.1 - 3.0.4 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用其他版本 |
| 3.0.0 | 支持 TDengine 3.0 |
| 2.0.42 | 修在 WebSocket 连接中 wasNull 接口返回值 |
| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 |
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index 0ea78a00ff..2a40976a8b 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -205,8 +205,7 @@ typedef struct SDataBlockInfo {
STimeWindow calWin; // used for stream, do not serialize
TSKEY watermark; // used for stream
- char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
- STag* pTag; // used for stream partition
+ char parTbName[TSDB_TABLE_NAME_LEN]; // used for stream partition
} SDataBlockInfo;
typedef struct SSDataBlock {
@@ -292,7 +291,6 @@ typedef struct STableBlockDistInfo {
uint16_t numOfFiles;
uint32_t numOfTables;
uint32_t numOfBlocks;
- uint32_t numOfVgroups;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
@@ -302,6 +300,7 @@ typedef struct STableBlockDistInfo {
int32_t firstSeekTimeUs;
uint32_t numOfInmemRows;
uint32_t numOfSmallBlocks;
+ uint32_t numOfVgroups;
int32_t blockRowsHisto[20];
} STableBlockDistInfo;
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index 98fcc2effa..cd63f7d278 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -2850,7 +2850,7 @@ typedef struct {
char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
- char idxName[TSDB_COL_NAME_LEN];
+ char idxName[TSDB_INDEX_FNAME_LEN];
int8_t idxType;
} SCreateTagIndexReq;
diff --git a/packaging/docker/DockerfileCloud b/packaging/docker/DockerfileCloud
deleted file mode 100644
index fa8fcabf34..0000000000
--- a/packaging/docker/DockerfileCloud
+++ /dev/null
@@ -1,30 +0,0 @@
-FROM ubuntu:18.04
-
-WORKDIR /root
-
-ARG pkgFile
-ARG dirName
-ARG cpuType
-RUN echo ${pkgFile} && echo ${dirName}
-
-RUN apt update
-RUN apt install -y curl
-
-COPY ${pkgFile} /root/
-ENV TINI_VERSION v0.19.0
-ENV TAOS_DISABLE_ADAPTER 1
-ADD https://github.com/krallin/tini/releases/download/${TINI_VERSION}/tini-${cpuType} /tini
-ENV DEBIAN_FRONTEND=noninteractive
-WORKDIR /root/
-RUN tar -zxf ${pkgFile} && cd /root/${dirName}/ && /bin/bash install.sh -e no && cd /root && rm /root/${pkgFile} && rm -rf /root/${dirName} && apt-get update && apt-get install -y locales tzdata netcat && locale-gen en_US.UTF-8 && apt-get clean && rm -rf /var/lib/apt/lists/ && chmod +x /tini
-
-ENV LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/lib" \
- LC_CTYPE=en_US.UTF-8 \
- LANG=en_US.UTF-8 \
- LC_ALL=en_US.UTF-8
-COPY ./run.sh /usr/bin/
-COPY ./bin/* /usr/bin/
-
-ENTRYPOINT ["/tini", "--", "/usr/bin/entrypoint.sh"]
-CMD ["bash", "-c", "/usr/bin/run.sh"]
-VOLUME [ "/var/lib/taos", "/var/log/taos" ]
diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c
index 7bb4a3def9..692837d928 100644
--- a/source/dnode/vnode/src/tsdb/tsdbRead.c
+++ b/source/dnode/vnode/src/tsdb/tsdbRead.c
@@ -4684,6 +4684,11 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfVgroups = 1;
// find the start data block in file
+
+ tsdbAcquireReader(pReader);
+ if (pReader->suspended) {
+ tsdbReaderResume(pReader);
+ }
SReaderStatus* pStatus = &pReader->status;
STsdbCfg* pc = &pReader->pTsdb->pVnode->config.tsdbCfg;
@@ -4745,7 +4750,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
// pReader->pFileGroup->fid, pReader->idStr);
}
-
+ tsdbReleaseReader(pReader);
return code;
}
diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c
index 16b43b560c..e9c46843c0 100644
--- a/source/libs/executor/src/dataInserter.c
+++ b/source/libs/executor/src/dataInserter.c
@@ -230,12 +230,19 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB:
- uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
- ASSERT(0);
+ qError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
+ terrno = TSDB_CODE_APP_ERROR;
+ goto _end;
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (colDataIsNull_s(pColInfoData, j)) {
+ if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
+ qError("NULL value for primary key");
+ terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
+ goto _end;
+ }
+
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);
} else {
@@ -256,7 +263,8 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
}
} else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
- ASSERT(0);
+ terrno = TSDB_CODE_APP_ERROR;
+ goto _end;
}
break;
}
@@ -296,7 +304,7 @@ _end:
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
taosMemoryFree(pReq);
}
- return TSDB_CODE_FAILED;
+ return terrno;
}
*ppReq = pReq;
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index a04e67d8ab..9986af1691 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -5423,7 +5423,6 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
pDistInfo->numOfBlocks += p1.numOfBlocks;
pDistInfo->numOfTables += p1.numOfTables;
pDistInfo->numOfInmemRows += p1.numOfInmemRows;
- pDistInfo->numOfVgroups += p1.numOfVgroups;
pDistInfo->totalSize += p1.totalSize;
pDistInfo->totalRows += p1.totalRows;
pDistInfo->numOfFiles += p1.numOfFiles;
@@ -5441,6 +5440,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) {
}
pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
+ pDistInfo->numOfVgroups += (p1.numOfTables != 0 ? 1 : 0);
for (int32_t i = 0; i < tListLen(pDistInfo->blockRowsHisto); ++i) {
pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i];
}
@@ -5459,7 +5459,6 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist
if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1;
if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1;
- if (tEncodeU32(&encoder, pInfo->numOfVgroups) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->totalRows) < 0) return -1;
@@ -5492,7 +5491,6 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo
if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1;
if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1;
- if (tDecodeU32(&decoder, &pInfo->numOfVgroups) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->totalRows) < 0) return -1;
diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c
index 25e65d2588..bb71fd7652 100644
--- a/source/libs/scalar/src/filter.c
+++ b/source/libs/scalar/src/filter.c
@@ -3120,9 +3120,8 @@ static FORCE_INLINE bool filterExecuteImplIsNull(void *pinfo, int32_t numOfRows,
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
- void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
- p[i] = ((colData == NULL) || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
+ p[i] = colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
if (p[i] == 0) {
all = false;
} else {
@@ -3146,9 +3145,8 @@ static FORCE_INLINE bool filterExecuteImplNotNull(void *pinfo, int32_t numOfRows
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
- void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
- p[i] = ((colData != NULL) && !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL));
+ p[i] = !colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL);
if (p[i] == 0) {
all = false;
} else {
@@ -3178,13 +3176,13 @@ bool filterExecuteImplRange(void *pinfo, int32_t numOfRows, SColumnInfoData *pRe
for (int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData *pData = info->cunits[0].colData;
- void *colData = colDataGetData(pData, i);
- if (colData == NULL || colDataIsNull_s(pData, i)) {
+ if (colDataIsNull_s(pData, i)) {
all = false;
p[i] = 0;
continue;
}
+ void *colData = colDataGetData(pData, i);
p[i] = (*rfunc)(colData, colData, valData, valData2, func);
if (p[i] == 0) {
@@ -3210,13 +3208,13 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes
for (int32_t i = 0; i < numOfRows; ++i) {
uint32_t uidx = info->groups[0].unitIdxs[0];
- void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
- if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
+ if (colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) {
p[i] = 0;
all = false;
continue;
}
+ void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i);
// match/nmatch for nchar type need convert from ucs4 to mbs
if (info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR &&
(info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)) {
@@ -3274,7 +3272,7 @@ bool filterExecuteImpl(void *pinfo, int32_t numOfRows, SColumnInfoData *pRes, SC
if (!isNull) {
colData = colDataGetData((SColumnInfoData *)(cunit->colData), i);
}
-
+
if (colData == NULL || isNull) {
p[i] = optr == OP_TYPE_IS_NULL ? true : false;
} else {
diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c
index 331d241745..3d63da7ba3 100644
--- a/source/os/src/osDir.c
+++ b/source/os/src/osDir.c
@@ -89,6 +89,8 @@ typedef struct dirent TdDirEntry;
#endif
+#define TDDIRMAXLEN 1024
+
void taosRemoveDir(const char *dirname) {
TdDirPtr pDir = taosOpenDir(dirname);
if (pDir == NULL) return;
@@ -133,8 +135,8 @@ int32_t taosMkDir(const char *dirname) {
}
int32_t taosMulMkDir(const char *dirname) {
- if (dirname == NULL) return -1;
- char temp[1024];
+ if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1;
+ char temp[TDDIRMAXLEN];
char *pos = temp;
int32_t code = 0;
#ifdef WINDOWS
@@ -192,8 +194,8 @@ int32_t taosMulMkDir(const char *dirname) {
}
int32_t taosMulModeMkDir(const char *dirname, int mode) {
- if (dirname == NULL) return -1;
- char temp[1024];
+ if (dirname == NULL || strlen(dirname) >= TDDIRMAXLEN) return -1;
+ char temp[TDDIRMAXLEN];
char *pos = temp;
int32_t code = 0;
#ifdef WINDOWS
@@ -204,8 +206,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
#endif
if (taosDirExist(temp)) {
- chmod(temp, mode);
- return code;
+ return chmod(temp, mode);
}
if (strncmp(temp, TD_DIRSEP, 1) == 0) {
@@ -247,12 +248,10 @@ int32_t taosMulModeMkDir(const char *dirname, int mode) {
}
if (code < 0 && errno == EEXIST) {
- chmod(temp, mode);
- return 0;
+ return chmod(temp, mode);
}
- chmod(temp, mode);
- return code;
+ return chmod(temp, mode);
}
void taosRemoveOldFiles(const char *dirname, int32_t keepDays) {
diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c
index 8cc9885adb..a14c8fd4c9 100644
--- a/source/os/src/osFile.c
+++ b/source/os/src/osFile.c
@@ -132,15 +132,20 @@ int64_t taosCopyFile(const char *from, const char *to) {
if (bytes < sizeof(buffer)) break;
}
- taosFsyncFile(pFileTo);
+ int code = taosFsyncFile(pFileTo);
taosCloseFile(&pFileFrom);
taosCloseFile(&pFileTo);
+
+ if (code != 0) {
+ return -1;
+ }
return size;
_err:
if (pFileFrom != NULL) taosCloseFile(&pFileFrom);
if (pFileTo != NULL) taosCloseFile(&pFileTo);
+ /* coverity[+retval] */
taosRemoveFile(to);
return -1;
#endif
@@ -506,13 +511,13 @@ int64_t taosPWriteFile(TdFilePtr pFile, const void *buf, int64_t count, int64_t
}
int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
+ if (pFile == NULL || pFile->fd < 0) {
+ return -1;
+ }
#if FILE_WITH_LOCK
taosThreadRwlockRdlock(&(pFile->rwlock));
#endif
ASSERT(pFile->fd >= 0); // Please check if you have closed the file.
- if (pFile->fd < 0) {
- return -1;
- }
#ifdef WINDOWS
int64_t ret = _lseeki64(pFile->fd, offset, whence);
#else
diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c
index fac547ca99..7d2c8aa4e5 100644
--- a/source/os/src/osSocket.c
+++ b/source/os/src/osSocket.c
@@ -745,8 +745,10 @@ bool taosValidIpAndPort(uint32_t ip, uint16_t port) {
#endif
serverAdd.sin_port = (uint16_t)htons(port);
- if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
- // printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
+ fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (fd < 0) { // exception
+ return false;
+ } else if (fd <= 2) { // in, out, err
taosCloseSocketNoCheck1(fd);
return false;
}
diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c
index 6471dad033..52309a7b35 100644
--- a/source/os/src/osSysinfo.c
+++ b/source/os/src/osSysinfo.c
@@ -439,11 +439,14 @@ int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores) {
if (code != 0 && (done & 1) == 0) {
TdFilePtr pFile1 = taosOpenFile("/proc/device-tree/model", TD_FILE_READ | TD_FILE_STREAM);
- if (pFile1 == NULL) return code;
- taosGetsFile(pFile1, maxLen, cpuModel);
- taosCloseFile(&pFile1);
- code = 0;
- done |= 1;
+ if (pFile1 != NULL) {
+ ssize_t bytes = taosGetsFile(pFile1, maxLen, cpuModel);
+ taosCloseFile(&pFile);
+ if (bytes > 0) {
+ code = 0;
+ done |= 1;
+ }
+ }
}
if (code != 0 && (done & 1) == 0) {
@@ -498,7 +501,7 @@ void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
curSysTotal = curSysUsed + sysCpu.idle;
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
- if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
+ if (curSysTotal - lastSysTotal > 0 && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
if (cpu_system != NULL) {
*cpu_system = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
}
@@ -610,12 +613,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) {
}
}
- if (strlen(line) < 0) {
- // printf("read file:%s failed", tsProcMemFile);
- taosCloseFile(&pFile);
- return -1;
- }
-
char tmp[10];
sscanf(line, "%s %" PRId64, tmp, usedKB);
diff --git a/source/os/src/osTimezone.c b/source/os/src/osTimezone.c
index ab5600744c..ad223bff27 100644
--- a/source/os/src/osTimezone.c
+++ b/source/os/src/osTimezone.c
@@ -909,7 +909,7 @@ void taosGetSystemTimezone(char *outTimezoneStr, enum TdTimezone *tsTimezone) {
char buf[4096] = {0};
char *tz = NULL;
{
- int n = readlink("/etc/localtime", buf, sizeof(buf));
+ int n = readlink("/etc/localtime", buf, sizeof(buf)-1);
if (n < 0) {
printf("read /etc/localtime error, reason:%s", strerror(errno));
diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh
index a793ef4436..b6cb4f6280 100755
--- a/tests/script/sh/checkAsan.sh
+++ b/tests/script/sh/checkAsan.sh
@@ -55,7 +55,7 @@ python_error=`cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l`
# /home/chr/TDengine/source/libs/scalar/src/filter.c:3149:14: runtime error: applying non-zero offset 18446744073709551615 to null pointer
# /home/TDinternal/community/source/libs/scalar/src/sclvector.c:1109:66: runtime error: signed integer overflow: 9223372034707292160 + 1676867897049 cannot be represented in type 'long int'
-runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |grep -v "filter.c:3149:14" |wc -l`
+runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type"| grep -v "signed integer overflow" |grep -v "strerror.c"| grep -v "asan_malloc_linux.cc" |wc -l`
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"
diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py
index 7f5169361c..c14d3b27b1 100644
--- a/tests/system-test/7-tmq/subscribeDb4.py
+++ b/tests/system-test/7-tmq/subscribeDb4.py
@@ -31,7 +31,8 @@ class TDTestCase:
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
- 'showRow': 1}
+ 'showRow': 1,
+ 'snapshot': 1}
cdbName = 'cdb'
# some parameter to consumer processor
@@ -42,7 +43,7 @@ class TDTestCase:
ifManualCommit = 1
groupId = 'group.id:cgrp1'
autoCommit = 'enable.auto.commit:true'
- autoCommitInterval = 'auto.commit.interval.ms:1000'
+ autoCommitInterval = 'auto.commit.interval.ms:100'
autoOffset = 'auto.offset.reset:earliest'
pollDelay = 20
@@ -86,7 +87,7 @@ class TDTestCase:
tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit)
tdLog.info("start consume processor")
- tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
+ tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName,0,0,self.paraDict["snapshot"])
tdLog.info("After waiting for a commit notify, drop one stable")
#time.sleep(3)
diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py
index 4cda062401..895da95e5d 100644
--- a/tests/system-test/7-tmq/tmqCommon.py
+++ b/tests/system-test/7-tmq/tmqCommon.py
@@ -151,7 +151,7 @@ class TMQCom:
if tdSql.getData(i, 1) == 0:
loopFlag = 0
break
- time.sleep(0.1)
+ time.sleep(0.02)
return
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb',rows=2):
@@ -165,7 +165,7 @@ class TMQCom:
if tdSql.getData(i, 1) == 1:
loopFlag = 0
break
- time.sleep(0.1)
+ time.sleep(0.02)
return
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c
index 54d31cdb74..85ed2a7ac7 100644
--- a/tools/shell/src/shellEngine.c
+++ b/tools/shell/src/shellEngine.c
@@ -845,6 +845,8 @@ void shellReadHistory() {
i = (i + SHELL_MAX_HISTORY_SIZE - 1) % SHELL_MAX_HISTORY_SIZE;
}
taosFprintfFile(pFile, "%s\n", pHistory->hist[endIndex]);
+
+ /* coverity[+retval] */
taosFsyncFile(pFile);
taosCloseFile(&pFile);
}