Merge pull request #3086 from taosdata/develop
Merge from develop into master
This commit is contained in:
commit
3e024c8ec1
|
@ -33,11 +33,7 @@ IF (TD_LINUX_64)
|
|||
ADD_DEFINITIONS(-D_M_X64)
|
||||
ADD_DEFINITIONS(-D_TD_LINUX_64)
|
||||
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g3 -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
|
||||
|
||||
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
|
||||
IF (ICONV_INCLUDE_EXIST)
|
||||
ADD_DEFINITIONS(-DUSE_LIBICONV)
|
||||
ENDIF ()
|
||||
ADD_DEFINITIONS(-DUSE_LIBICONV)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_LINUX_32)
|
||||
|
@ -50,6 +46,7 @@ IF (TD_ARM_64)
|
|||
ADD_DEFINITIONS(-D_M_X64)
|
||||
ADD_DEFINITIONS(-D_TD_ARM_64_)
|
||||
ADD_DEFINITIONS(-D_TD_ARM_)
|
||||
ADD_DEFINITIONS(-DUSE_LIBICONV)
|
||||
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -g -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
|
||||
ENDIF ()
|
||||
|
||||
|
@ -133,6 +130,7 @@ ENDIF ()
|
|||
|
||||
IF (TD_WINDOWS_32)
|
||||
ADD_DEFINITIONS(-D_TD_WINDOWS_32)
|
||||
ADD_DEFINITIONS(-DUSE_LIBICONV)
|
||||
ENDIF ()
|
||||
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
|
|
|
@ -295,27 +295,30 @@ $ taos
|
|||
这时,因为电流超过了10A,您应该可以看到示例程序将它输出到了屏幕上。
|
||||
您可以继续插入一些数据观察示例程序的输出。
|
||||
|
||||
### jdbc使用数据订阅功能
|
||||
### Java 使用数据订阅功能
|
||||
|
||||
(1)使用订阅功能前的数据准备
|
||||
订阅功能也提供了 Java 开发接口,相关说明请见 [Java Connector](https://www.taosdata.com/cn/documentation20/connector/)。需要注意的是,目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。
|
||||
|
||||
```shell
|
||||
# 创建power库
|
||||
下面以一个示例程序介绍其具体使用方法。它所完成的功能与前面介绍的 C 语言示例基本相同,也是订阅数据库中所有电流超过 10A 的记录。
|
||||
|
||||
#### 准备数据
|
||||
|
||||
```sql
|
||||
# 创建 power 库
|
||||
taos> create database power;
|
||||
# 切换库
|
||||
taos> use power;
|
||||
# 创建超级表
|
||||
taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupI
|
||||
d int);
|
||||
taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
|
||||
# 创建表
|
||||
taos> create table d1001 using meters tags ("Beijing.Chaoyang",2);
|
||||
taos> create table d1002 using meters tags ("Beijing.Haidian",2);
|
||||
taos> create table d1001 using meters tags ("Beijing.Chaoyang", 2);
|
||||
taos> create table d1002 using meters tags ("Beijing.Haidian", 2);
|
||||
# 插入测试数据
|
||||
taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
|
||||
taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
|
||||
# 从超级表meters查询current大于10的数据
|
||||
# 从超级表 meters 查询电流大于 10A 的记录
|
||||
taos> select * from meters where current > 10;
|
||||
ts | current | voltage | phase| location | groupid |
|
||||
ts | current | voltage | phase | location | groupid |
|
||||
===========================================================================================================
|
||||
2020-08-15 12:10:00.000 | 10.30000 | 220 | 1 | Beijing.Haidian | 2 |
|
||||
2020-08-15 12:20:00.000 | 11.20000 | 220 | 1 | Beijing.Haidian | 2 |
|
||||
|
@ -325,70 +328,50 @@ taos> select * from meters where current > 10;
|
|||
Query OK, 5 row(s) in set (0.004896s)
|
||||
```
|
||||
|
||||
(2)使用jdbc提供的订阅功能
|
||||
#### 示例程序
|
||||
|
||||
```java
|
||||
public class SubscribeDemo {
|
||||
private static final String topic = "topic_meter_current_bg_10";
|
||||
private static final String topic = "topic-meter-current-bg-10";
|
||||
private static final String sql = "select * from meters where current > 10";
|
||||
|
||||
public static void main(String[] args) {
|
||||
Connection connection = null;
|
||||
Statement statement = null;
|
||||
TSDBSubscribe subscribe = null;
|
||||
long subscribeId = 0;
|
||||
|
||||
try {
|
||||
// 加载驱动
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
// 获取Connectin
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
|
||||
properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
|
||||
String jdbcUrl = "jdbc:TAOS://127.0.0.1:6030/power?user=root&password=taosdata";
|
||||
connection = DriverManager.getConnection(jdbcUrl, properties);
|
||||
System.out.println("create the connection");
|
||||
// 创建Subscribe
|
||||
subscribe = ((TSDBConnection) connection).createSubscribe();
|
||||
// subscribe订阅topic,topic为主题名称,sql为查询语句,restart代表是否每次订阅接受历史数据
|
||||
subscribeId = subscribe.subscribe(topic, sql, true);
|
||||
System.out.println("create a subscribe topic: " + topic + "@[" + subscribeId + "]");
|
||||
subscribe = ((TSDBConnection) connection).subscribe(topic, sql, true); // 创建订阅
|
||||
int count = 0;
|
||||
while (true) {
|
||||
// 消费数据
|
||||
TSDBResultSet resultSet = subscribe.consume(subscribeId);
|
||||
// 打印结果集
|
||||
if (resultSet != null) {
|
||||
ResultSetMetaData metaData = resultSet.getMetaData();
|
||||
while (resultSet.next()) {
|
||||
int columnCount = metaData.getColumnCount();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
System.out.print(metaData.getColumnLabel(i) + " : " + resultSet.getString(i) + "\t");
|
||||
}
|
||||
System.out.println("\n====================");
|
||||
count++;
|
||||
}
|
||||
while (count < 10) {
|
||||
TimeUnit.SECONDS.sleep(1); // 等待1秒,避免频繁调用 consume,给服务端造成压力
|
||||
TSDBResultSet resultSet = subscribe.consume(); // 消费数据
|
||||
if (resultSet == null) {
|
||||
continue;
|
||||
}
|
||||
ResultSetMetaData metaData = resultSet.getMetaData();
|
||||
while (resultSet.next()) {
|
||||
int columnCount = metaData.getColumnCount();
|
||||
for (int i = 1; i <= columnCount; i++) {
|
||||
System.out.print(metaData.getColumnLabel(i) + ": " + resultSet.getString(i) + "\t");
|
||||
}
|
||||
System.out.println();
|
||||
count++;
|
||||
}
|
||||
if (count > 10)
|
||||
break;
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
try {
|
||||
if (null != subscribe && subscribeId != 0) {
|
||||
subscribe.unsubscribe(subscribeId, true);
|
||||
System.out.println("unsubscribe the top@[" + subscribeId + "]");
|
||||
}
|
||||
if (statement != null) {
|
||||
statement.close();
|
||||
System.out.println("close the statement.");
|
||||
}
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
System.out.println("close the connection.");
|
||||
}
|
||||
if (null != subscribe)
|
||||
subscribe.close(true); // 关闭订阅
|
||||
if (connection != null)
|
||||
connection.close();
|
||||
} catch (SQLException throwables) {
|
||||
throwables.printStackTrace();
|
||||
}
|
||||
|
@ -397,38 +380,30 @@ public class SubscribeDemo {
|
|||
}
|
||||
```
|
||||
|
||||
(3)订阅功能演示
|
||||
|
||||
运行demo,首先,subscribe会将满足情况的历史数据消费
|
||||
运行示例程序,首先,它会消费符合查询条件的所有历史数据:
|
||||
|
||||
```shell
|
||||
# java -jar subscribe.jar
|
||||
|
||||
ts : 1597464000000 current : 12.0 voltage : 220 phase : 1 location : Beijing.Chaoyang groupid : 2
|
||||
====================
|
||||
ts : 1597464600000 current : 12.3 voltage : 220 phase : 2 location : Beijing.Chaoyang groupid : 2
|
||||
====================
|
||||
ts : 1597465200000 current : 12.2 voltage : 220 phase : 1 location : Beijing.Chaoyang groupid : 2
|
||||
====================
|
||||
ts : 1597464600000 current : 10.3 voltage : 220 phase : 1 location : Beijing.Haidian groupid : 2
|
||||
====================
|
||||
ts : 1597465200000 current : 11.2 voltage : 220 phase : 1 location : Beijing.Haidian groupid : 2
|
||||
====================
|
||||
ts: 1597464000000 current: 12.0 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid : 2
|
||||
ts: 1597464600000 current: 12.3 voltage: 220 phase: 2 location: Beijing.Chaoyang groupid : 2
|
||||
ts: 1597465200000 current: 12.2 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid : 2
|
||||
ts: 1597464600000 current: 10.3 voltage: 220 phase: 1 location: Beijing.Haidian groupid : 2
|
||||
ts: 1597465200000 current: 11.2 voltage: 220 phase: 1 location: Beijing.Haidian groupid : 2
|
||||
```
|
||||
|
||||
接着,使用taos客户端向表中新增数据
|
||||
接着,使用 taos 客户端向表中新增一条数据:
|
||||
|
||||
```shell
|
||||
```sql
|
||||
# taos
|
||||
taos> use power;
|
||||
taos> insert into d1001 values("2020-08-15 12:40:00.000", 12.4, 220, 1);
|
||||
```
|
||||
|
||||
查看数据消费情况
|
||||
因为这条数据的电流大于10A,示例程序会将其消费:
|
||||
|
||||
```shell
|
||||
ts : 1597466400000 current : 12.4 voltage : 220 phase : 1 location : Beijing.Chaoyang groupid : 2
|
||||
====================
|
||||
ts: 1597466400000 current: 12.4 voltage: 220 phase: 1 location: Beijing.Chaoyang groupid: 2
|
||||
```
|
||||
|
||||
|
||||
|
|
|
@ -1795,19 +1795,22 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
|||
|
||||
if (pQueryHandle->checkFiles) {
|
||||
bool exists = true;
|
||||
|
||||
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
pQueryHandle->activeIndex = 0;
|
||||
pQueryHandle->checkFiles = false;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
if (exists) {
|
||||
elapsedTime = taosGetTimestampUs() - stime;
|
||||
pQueryHandle->cost.checkForNextTime += elapsedTime;
|
||||
pQueryHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
|
||||
return exists;
|
||||
}
|
||||
|
||||
pQueryHandle->activeIndex = 0;
|
||||
pQueryHandle->checkFiles = false;
|
||||
pQueryHandle->checkFiles = false;
|
||||
}
|
||||
|
||||
// TODO: opt by consider the scan order
|
||||
|
|
|
@ -135,7 +135,6 @@ cd ../../../debug; make
|
|||
./test.sh -f general/parser/limit2.sim
|
||||
./test.sh -f general/parser/fill.sim
|
||||
./test.sh -f general/parser/fill_stb.sim
|
||||
#./test.sh -f general/parser/fill_us.sim
|
||||
./test.sh -f general/parser/where.sim
|
||||
./test.sh -f general/parser/slimit.sim
|
||||
./test.sh -f general/parser/select_with_tags.sim
|
||||
|
@ -143,7 +142,6 @@ cd ../../../debug; make
|
|||
./test.sh -f general/parser/tags_dynamically_specifiy.sim
|
||||
./test.sh -f general/parser/groupby.sim
|
||||
./test.sh -f general/parser/set_tag_vals.sim
|
||||
#./test.sh -f general/parser/sliding.sim
|
||||
./test.sh -f general/parser/tags_filter.sim
|
||||
./test.sh -f general/parser/slimit_alter_tags.sim
|
||||
./test.sh -f general/parser/join.sim
|
||||
|
@ -151,6 +149,8 @@ cd ../../../debug; make
|
|||
./test.sh -f general/parser/binary_escapeCharacter.sim
|
||||
./test.sh -f general/parser/bug.sim
|
||||
./test.sh -f general/parser/repeatAlter.sim
|
||||
./test.sh -f general/parser/union.sim
|
||||
./test.sh -f general/parser/topbot.sim
|
||||
|
||||
./test.sh -f general/stable/disk.sim
|
||||
./test.sh -f general/stable/dnode3.sim
|
||||
|
|
Loading…
Reference in New Issue