Merge branch 'main' into refact/streamsm
This commit is contained in:
commit
d75948160d
|
@ -371,7 +371,7 @@ tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
|||
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
|
||||
|
@ -401,7 +401,7 @@ properties.setProperty("group.id", "cgrpName");
|
|||
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
||||
properties.setProperty("td.connect.user", "root");
|
||||
properties.setProperty("td.connect.pass", "taosdata");
|
||||
properties.setProperty("auto.offset.reset", "earliest");
|
||||
properties.setProperty("auto.offset.reset", "latest");
|
||||
properties.setProperty("msg.with.table.name", "true");
|
||||
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
|
||||
|
||||
|
@ -421,7 +421,7 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
```go
|
||||
conf := &tmq.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
|
@ -441,7 +441,7 @@ consumer, err := NewConsumer(conf)
|
|||
let mut dsn: Dsn = "taos://".parse()?;
|
||||
dsn.set("group.id", "group1");
|
||||
dsn.set("client.id", "test");
|
||||
dsn.set("auto.offset.reset", "earliest");
|
||||
dsn.set("auto.offset.reset", "latest");
|
||||
|
||||
let tmq = TmqBuilder::from_dsn(dsn)?;
|
||||
|
||||
|
@ -467,7 +467,7 @@ consumer = Consumer(
|
|||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
"msg.with.table.name": "true",
|
||||
}
|
||||
)
|
||||
|
@ -487,7 +487,7 @@ let consumer = taos.consumer({
|
|||
'group.id': 'tg2',
|
||||
'td.connect.user': 'root',
|
||||
'td.connect.pass': 'taosdata',
|
||||
'auto.offset.reset','earliest',
|
||||
'auto.offset.reset','latest',
|
||||
'msg.with.table.name': 'true',
|
||||
'td.connect.ip','127.0.0.1',
|
||||
'td.connect.port','6030'
|
||||
|
@ -510,7 +510,7 @@ var cfg = new ConsumerConfig
|
|||
GourpId = "TDengine-TMQ-C#",
|
||||
TDConnectUser = "root",
|
||||
TDConnectPasswd = "taosdata",
|
||||
AutoOffsetReset = "earliest"
|
||||
AutoOffsetReset = "latest"
|
||||
MsgWithTableName = "true",
|
||||
TDConnectIp = "127.0.0.1",
|
||||
TDConnectPort = "6030"
|
||||
|
|
|
@ -1093,7 +1093,7 @@ TaosConsumer consumer = new TaosConsumer<>(config);
|
|||
- httpConnectTimeout: WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type.
|
||||
- messageWaitTimeout: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type.
|
||||
- httpPoolSize: Maximum number of concurrent requests on the a connection。It only takes effect when using WebSocket type.
|
||||
- For more information, see [Consumer Parameters](../../../develop/tmq).
|
||||
- For more information, see [Consumer Parameters](../../../develop/tmq). Note that the default value of auto.offset.reset in data subscription on the TDengine server has changed since version 3.2.0.0.
|
||||
|
||||
#### Subscribe to consume data
|
||||
|
||||
|
@ -1193,7 +1193,7 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("auto.offset.reset", "latest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
|
@ -1276,7 +1276,7 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("auto.offset.reset", "latest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
|
|
|
@ -794,7 +794,7 @@ The TDengine Go Connector supports subscription functionality with the following
|
|||
```go
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
|
@ -870,6 +870,7 @@ package main
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/taosdata/driver-go/v3/af"
|
||||
"github.com/taosdata/driver-go/v3/af/tmq"
|
||||
|
@ -890,19 +891,16 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"td.connect.port": "6030",
|
||||
"client.id": "test_tmq_client",
|
||||
"enable.auto.commit": "false",
|
||||
"msg.with.table.name": "true",
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"td.connect.port": "6030",
|
||||
"client.id": "test_tmq_client",
|
||||
"enable.auto.commit": "false",
|
||||
"msg.with.table.name": "true",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -915,10 +913,16 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := consumer.Poll(500)
|
||||
if ev != nil {
|
||||
|
@ -972,6 +976,7 @@ package main
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/taosdata/driver-go/v3/common"
|
||||
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
|
||||
|
@ -995,7 +1000,7 @@ func main() {
|
|||
"td.connect.pass": "taosdata",
|
||||
"group.id": "example",
|
||||
"client.id": "example_consumer",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -1004,29 +1009,34 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
|
||||
"c1 bool," +
|
||||
"c2 tinyint," +
|
||||
"c3 smallint," +
|
||||
"c4 int," +
|
||||
"c5 bigint," +
|
||||
"c6 tinyint unsigned," +
|
||||
"c7 smallint unsigned," +
|
||||
"c8 int unsigned," +
|
||||
"c9 bigint unsigned," +
|
||||
"c10 float," +
|
||||
"c11 double," +
|
||||
"c12 binary(20)," +
|
||||
"c13 nchar(20)" +
|
||||
")")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
|
||||
"c1 bool," +
|
||||
"c2 tinyint," +
|
||||
"c3 smallint," +
|
||||
"c4 int," +
|
||||
"c5 bigint," +
|
||||
"c6 tinyint unsigned," +
|
||||
"c7 smallint unsigned," +
|
||||
"c8 int unsigned," +
|
||||
"c9 bigint unsigned," +
|
||||
"c10 float," +
|
||||
"c11 double," +
|
||||
"c12 binary(20)," +
|
||||
"c13 nchar(20)" +
|
||||
")")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
for {
|
||||
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
}()
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := consumer.Poll(500)
|
||||
|
|
|
@ -442,7 +442,7 @@ The following parameters can be configured for the TMQ DSN. Only `group.id` is m
|
|||
|
||||
- `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
|
||||
- `client.id`: Subscriber client ID.
|
||||
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
|
||||
- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default value varies depending on the TDengine version. For details, see [Data Subscription](https://docs.tdengine.com/develop/tmq/). Note: This parameter is set per consumer group.
|
||||
- `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential.
|
||||
- `auto.commit.interval.ms`: Interval for automatic commits.
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package main
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/taosdata/driver-go/v3/af"
|
||||
"github.com/taosdata/driver-go/v3/af/tmq"
|
||||
|
@ -27,15 +28,15 @@ func main() {
|
|||
panic(err)
|
||||
}
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"td.connect.port": "6030",
|
||||
"client.id": "test_tmq_client",
|
||||
"enable.auto.commit": "false",
|
||||
"msg.with.table.name": "true",
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"td.connect.port": "6030",
|
||||
"client.id": "test_tmq_client",
|
||||
"enable.auto.commit": "false",
|
||||
"msg.with.table.name": "true",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -48,12 +49,17 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Microsecond * 100)
|
||||
}
|
||||
}()
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := consumer.Poll(0)
|
||||
ev := consumer.Poll(500)
|
||||
if ev != nil {
|
||||
switch e := ev.(type) {
|
||||
case *tmqcommon.DataMessage:
|
||||
|
|
|
@ -66,7 +66,6 @@ public class SubscribeDemo {
|
|||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||
"com.taos.example.MetersDeserializer");
|
||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
||||
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
|
||||
|
||||
// poll data
|
||||
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||
|
|
|
@ -66,7 +66,6 @@ public class WebsocketSubscribeDemo {
|
|||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER,
|
||||
"com.taos.example.MetersDeserializer");
|
||||
properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8");
|
||||
properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true");
|
||||
|
||||
// poll data
|
||||
try (TaosConsumer<Meters> consumer = new TaosConsumer<>(properties)) {
|
||||
|
|
|
@ -23,9 +23,6 @@ def taos_get_assignment_and_seek_demo():
|
|||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "0",
|
||||
# should disable snapshot,
|
||||
# otherwise it will cause invalid params error
|
||||
"experimental.snapshot.enable": "false",
|
||||
}
|
||||
)
|
||||
consumer.subscribe(["tmq_assignment_demo_topic"])
|
||||
|
|
|
@ -21,9 +21,6 @@ def taosws_get_assignment_and_seek_demo():
|
|||
prepare()
|
||||
consumer = taosws.Consumer(conf={
|
||||
"td.connect.websocket.scheme": "ws",
|
||||
# should disable snapshot,
|
||||
# otherwise it will cause invalid params error
|
||||
"experimental.snapshot.enable": "false",
|
||||
"group.id": "0",
|
||||
})
|
||||
consumer.subscribe(["tmq_assignment_demo_topic"])
|
||||
|
|
|
@ -370,7 +370,7 @@ tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
|||
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
|
||||
|
@ -400,7 +400,7 @@ properties.setProperty("group.id", "cgrpName");
|
|||
properties.setProperty("bootstrap.servers", "127.0.0.1:6030");
|
||||
properties.setProperty("td.connect.user", "root");
|
||||
properties.setProperty("td.connect.pass", "taosdata");
|
||||
properties.setProperty("auto.offset.reset", "earliest");
|
||||
properties.setProperty("auto.offset.reset", "latest");
|
||||
properties.setProperty("msg.with.table.name", "true");
|
||||
properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer");
|
||||
|
||||
|
@ -420,7 +420,7 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
```go
|
||||
conf := &tmq.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
|
@ -440,7 +440,7 @@ consumer, err := NewConsumer(conf)
|
|||
let mut dsn: Dsn = "taos://".parse()?;
|
||||
dsn.set("group.id", "group1");
|
||||
dsn.set("client.id", "test");
|
||||
dsn.set("auto.offset.reset", "earliest");
|
||||
dsn.set("auto.offset.reset", "latest");
|
||||
|
||||
let tmq = TmqBuilder::from_dsn(dsn)?;
|
||||
|
||||
|
@ -468,7 +468,7 @@ consumer = Consumer(
|
|||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
"msg.with.table.name": "true",
|
||||
}
|
||||
)
|
||||
|
@ -488,7 +488,7 @@ let consumer = taos.consumer({
|
|||
'group.id': 'tg2',
|
||||
'td.connect.user': 'root',
|
||||
'td.connect.pass': 'taosdata',
|
||||
'auto.offset.reset','earliest',
|
||||
'auto.offset.reset','latest',
|
||||
'msg.with.table.name': 'true',
|
||||
'td.connect.ip','127.0.0.1',
|
||||
'td.connect.port','6030'
|
||||
|
@ -511,7 +511,7 @@ var cfg = new ConsumerConfig
|
|||
GourpId = "TDengine-TMQ-C#",
|
||||
TDConnectUser = "root",
|
||||
TDConnectPasswd = "taosdata",
|
||||
AutoOffsetReset = "earliest"
|
||||
AutoOffsetReset = "latest"
|
||||
MsgWithTableName = "true",
|
||||
TDConnectIp = "127.0.0.1",
|
||||
TDConnectPort = "6030"
|
||||
|
|
|
@ -1095,7 +1095,7 @@ TaosConsumer consumer = new TaosConsumer<>(config);
|
|||
- httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。
|
||||
- messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。
|
||||
- httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。
|
||||
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group)
|
||||
其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group), 注意TDengine服务端自3.2.0.0版本开始消息订阅中的auto.offset.reset默认值发生变化。
|
||||
|
||||
#### 订阅消费数据
|
||||
|
||||
|
@ -1193,7 +1193,7 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("bootstrap.servers", "localhost:6030");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("auto.offset.reset", "latest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
|
@ -1201,7 +1201,6 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
@ -1279,7 +1278,7 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("bootstrap.servers", "localhost:6041");
|
||||
config.setProperty("td.connect.user", "root");
|
||||
config.setProperty("td.connect.pass", "taosdata");
|
||||
config.setProperty("auto.offset.reset", "earliest");
|
||||
config.setProperty("auto.offset.reset", "latest");
|
||||
config.setProperty("msg.with.table.name", "true");
|
||||
config.setProperty("enable.auto.commit", "true");
|
||||
config.setProperty("auto.commit.interval.ms", "1000");
|
||||
|
@ -1287,7 +1286,6 @@ public abstract class ConsumerLoop {
|
|||
config.setProperty("client.id", "1");
|
||||
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer");
|
||||
config.setProperty("value.deserializer.encoding", "UTF-8");
|
||||
config.setProperty("experimental.snapshot.enable", "true");
|
||||
|
||||
this.consumer = new TaosConsumer<>(config);
|
||||
this.topics = Collections.singletonList("topic_speed");
|
||||
|
|
|
@ -797,7 +797,7 @@ TDengine Go 连接器支持订阅功能,应用 API 如下:
|
|||
```go
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
|
@ -873,6 +873,7 @@ package main
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/taosdata/driver-go/v3/af"
|
||||
"github.com/taosdata/driver-go/v3/af/tmq"
|
||||
|
@ -893,19 +894,16 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "earliest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"td.connect.port": "6030",
|
||||
"client.id": "test_tmq_client",
|
||||
"enable.auto.commit": "false",
|
||||
"msg.with.table.name": "true",
|
||||
"group.id": "test",
|
||||
"auto.offset.reset": "latest",
|
||||
"td.connect.ip": "127.0.0.1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"td.connect.port": "6030",
|
||||
"client.id": "test_tmq_client",
|
||||
"enable.auto.commit": "false",
|
||||
"msg.with.table.name": "true",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -918,10 +916,16 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
_, err = db.Exec("insert into example_tmq.t1 values(now,1)")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
}()
|
||||
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := consumer.Poll(500)
|
||||
if ev != nil {
|
||||
|
@ -975,6 +979,7 @@ package main
|
|||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/taosdata/driver-go/v3/common"
|
||||
tmqcommon "github.com/taosdata/driver-go/v3/common/tmq"
|
||||
|
@ -998,7 +1003,7 @@ func main() {
|
|||
"td.connect.pass": "taosdata",
|
||||
"group.id": "example",
|
||||
"client.id": "example_consumer",
|
||||
"auto.offset.reset": "earliest",
|
||||
"auto.offset.reset": "latest",
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@ -1007,29 +1012,34 @@ func main() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
|
||||
"c1 bool," +
|
||||
"c2 tinyint," +
|
||||
"c3 smallint," +
|
||||
"c4 int," +
|
||||
"c5 bigint," +
|
||||
"c6 tinyint unsigned," +
|
||||
"c7 smallint unsigned," +
|
||||
"c8 int unsigned," +
|
||||
"c9 bigint unsigned," +
|
||||
"c10 float," +
|
||||
"c11 double," +
|
||||
"c12 binary(20)," +
|
||||
"c13 nchar(20)" +
|
||||
")")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
_, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," +
|
||||
"c1 bool," +
|
||||
"c2 tinyint," +
|
||||
"c3 smallint," +
|
||||
"c4 int," +
|
||||
"c5 bigint," +
|
||||
"c6 tinyint unsigned," +
|
||||
"c7 smallint unsigned," +
|
||||
"c8 int unsigned," +
|
||||
"c9 bigint unsigned," +
|
||||
"c10 float," +
|
||||
"c11 double," +
|
||||
"c12 binary(20)," +
|
||||
"c13 nchar(20)" +
|
||||
")")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
for {
|
||||
_, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
time.Sleep(time.Millisecond * 100)
|
||||
}
|
||||
|
||||
}()
|
||||
for i := 0; i < 5; i++ {
|
||||
ev := consumer.Poll(500)
|
||||
|
|
|
@ -447,7 +447,7 @@ consumer.unsubscribe().await;
|
|||
|
||||
- `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
|
||||
- `client.id`: 可选的订阅客户端识别项。
|
||||
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。
|
||||
- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认值根据 TDengine 版本有所不同,详细参见 [数据订阅](https://docs.taosdata.com/develop/tmq/)。注意,此选项在同一个 `group.id` 中仅生效一次。
|
||||
- `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
|
||||
- `auto.commit.interval.ms`: 自动标记的时间间隔。
|
||||
|
||||
|
|
|
@ -154,7 +154,7 @@ interactiveFqdn=yes # [yes | no]
|
|||
verType=server # [server | client]
|
||||
initType=systemd # [systemd | service | ...]
|
||||
|
||||
while getopts "hv:e:i:" arg; do
|
||||
while getopts "hv:e:" arg; do
|
||||
case $arg in
|
||||
e)
|
||||
#echo "interactiveFqdn=$OPTARG"
|
||||
|
@ -164,10 +164,6 @@ while getopts "hv:e:i:" arg; do
|
|||
#echo "verType=$OPTARG"
|
||||
verType=$(echo $OPTARG)
|
||||
;;
|
||||
i)
|
||||
#echo "initType=$OPTARG"
|
||||
initType=$(echo $OPTARG)
|
||||
;;
|
||||
h)
|
||||
echo "Usage: $(basename $0) -v [server | client] -e [yes | no]"
|
||||
exit 0
|
||||
|
@ -385,7 +381,10 @@ function set_hostname() {
|
|||
echo -e -n "${GREEN}Enter the public accessible IP address or fully qualified domain name TDengine will expose to users or applications (must not be 'localhost') :${NC}"
|
||||
read newHostname
|
||||
while true; do
|
||||
if [[ ! -z "$newHostname" && "$newHostname" != "localhost" ]]; then
|
||||
if [ -z "$newHostname" ]; then
|
||||
newHostname=$(hostname)
|
||||
break
|
||||
elif [ "$newHostname" != "localhost" ]; then
|
||||
break
|
||||
else
|
||||
echo -e -n "${GREEN}Enter the public accessible IP address or fully qualified domain name TDengine will expose to users or applications (must not be 'localhost') :${NC}"
|
||||
|
@ -518,9 +517,7 @@ function install_adapter_config() {
|
|||
|
||||
}
|
||||
|
||||
function install_config() {
|
||||
|
||||
local_fqdn_check
|
||||
function install_config() {
|
||||
|
||||
if [ ! -f "${cfg_install_dir}/${configFile2}" ]; then
|
||||
${csudo}mkdir -p ${cfg_install_dir}
|
||||
|
@ -542,13 +539,15 @@ function install_config() {
|
|||
|
||||
|
||||
|
||||
# if ((${update_flag} == 1)); then
|
||||
# return 0
|
||||
# fi
|
||||
if ((${update_flag} == 1)); then
|
||||
return 0
|
||||
fi
|
||||
|
||||
# if [ "$interactiveFqdn" == "no" ]; then
|
||||
# return 0
|
||||
# fi
|
||||
if [ "$interactiveFqdn" == "no" ]; then
|
||||
return 0
|
||||
fi
|
||||
|
||||
local_fqdn_check
|
||||
|
||||
echo
|
||||
echo -e -n "${GREEN}Enter FQDN:port (like h1.${emailName2}:6030) of an existing ${productName2} cluster node to join${NC}"
|
||||
|
@ -1073,7 +1072,7 @@ function installProduct() {
|
|||
echo -e "\033[44;32;1mTo access ${productName2} : ${clientName2} -h $serverFqdn${NC}"
|
||||
if [ "$verMode" == "cluster" ];then
|
||||
echo -e "\033[44;32;1mTo access the management system : http://$serverFqdn:6060${NC}"
|
||||
echo -e "\033[44;32;1mTo read the user manual : http://$serverFqdn:6060/docs${NC}"
|
||||
echo -e "\033[44;32;1mTo read the user manual : http://$serverFqdn:6060/docs-en${NC}"
|
||||
fi
|
||||
echo
|
||||
else # Only install client
|
||||
|
|
|
@ -63,6 +63,10 @@ service_config_dir="/etc/systemd/system"
|
|||
taos_service_name=${serverName2}
|
||||
taosadapter_service_name="${clientName2}adapter"
|
||||
tarbitrator_service_name="tarbitratord"
|
||||
|
||||
config_dir="/etc/${clientName2}"
|
||||
|
||||
|
||||
csudo=""
|
||||
if command -v sudo >/dev/null; then
|
||||
csudo="sudo "
|
||||
|
@ -264,6 +268,20 @@ function clean_service() {
|
|||
fi
|
||||
}
|
||||
|
||||
function remove_data_and_config() {
|
||||
data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
|
||||
if [ X"$data_dir" == X"" ]; then
|
||||
data_dir="/var/lib/taos"
|
||||
fi
|
||||
log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}`
|
||||
if [ X"$log_dir" == X"" ]; then
|
||||
log_dir="/var/lib/taos"
|
||||
fi
|
||||
${csudo}rm -rf ${config_dir}/*
|
||||
${csudo}rm -rf ${data_dir}/*
|
||||
${csudo}rm -rf ${log_dir}/*
|
||||
}
|
||||
|
||||
function uninstall_taosx() {
|
||||
if [ -f /usr/local/taosx/uninstall.sh ]; then
|
||||
cd /usr/local/taosx
|
||||
|
@ -374,6 +392,14 @@ remove_taoskeeper() {
|
|||
}
|
||||
remove_taoskeeper
|
||||
|
||||
echo
|
||||
echo "Do you want to remove all the data, log and configuration files? [y/n]"
|
||||
read answer
|
||||
if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then
|
||||
remove_data_and_config
|
||||
fi
|
||||
|
||||
|
||||
if [ "$verMode" == "cluster" ]; then
|
||||
uninstall_taosx
|
||||
fi
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -307,7 +307,6 @@ typedef struct STagScanInfo {
|
|||
SSDataBlock* pRes;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t curPos;
|
||||
SLimitNode* pSlimit;
|
||||
SReadHandle readHandle;
|
||||
STableListInfo* pTableListInfo;
|
||||
uint64_t suid;
|
||||
|
@ -318,6 +317,7 @@ typedef struct STagScanInfo {
|
|||
SArray* aUidTags; // SArray<STUidTagInfo>
|
||||
SArray* aFilterIdxs; // SArray<int32_t>
|
||||
SStorageAPI* pStorageAPI;
|
||||
SLimitInfo limitInfo;
|
||||
} STagScanInfo;
|
||||
|
||||
typedef enum EStreamScanMode {
|
||||
|
|
|
@ -3060,7 +3060,12 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
|||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
pOperator->resultInfo.totalRows += count;
|
||||
|
||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||
if (bLimitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
|
@ -3094,28 +3099,20 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
|
|||
if (++pInfo->curPos >= size) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
// each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason.
|
||||
if (pInfo->pSlimit != NULL) {
|
||||
if (pInfo->curPos < pInfo->pSlimit->offset) {
|
||||
continue;
|
||||
}
|
||||
pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name));
|
||||
if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
pRes->info.rows = count;
|
||||
|
||||
pAPI->metaReaderFn.clearReader(&mr);
|
||||
|
||||
bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo);
|
||||
if (bLimitReached) {
|
||||
setOperatorCompleted(pOperator);
|
||||
}
|
||||
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
}
|
||||
|
||||
pRes->info.rows = count;
|
||||
pOperator->resultInfo.totalRows += count;
|
||||
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||
|
||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
@ -3169,8 +3166,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||
pInfo->readHandle = *pReadHandle;
|
||||
pInfo->curPos = 0;
|
||||
pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group
|
||||
|
||||
initLimitInfo(pPhyNode->node.pLimit, pPhyNode->node.pSlimit, &pInfo->limitInfo);
|
||||
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
|
|
@ -2730,36 +2730,6 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) {
|
||||
SLogicNode* pNode = pTableScanNode->pParent;
|
||||
while (NULL != pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) ||
|
||||
QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) {
|
||||
return NULL;
|
||||
}
|
||||
if (NULL != pNode->pSlimit) {
|
||||
return pNode;
|
||||
}
|
||||
pNode = pNode->pParent;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
|
||||
if (NULL != pTableScanNode->pSlimit) {
|
||||
return;
|
||||
}
|
||||
|
||||
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
|
||||
if (NULL != pNode) {
|
||||
// TODO: only set the slimit now. push down slimit later
|
||||
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
|
||||
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
|
||||
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized);
|
||||
if (NULL == pScanNode) {
|
||||
|
@ -2795,13 +2765,6 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
|
|||
pScanNode->node.pTargets = pScanTargets;
|
||||
}
|
||||
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, pAgg, (SLogicNode*)pScanNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pAgg->pChildren);
|
||||
}
|
||||
nodesDestroyNode((SNode*)pAgg);
|
||||
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
|
||||
|
||||
pScanNode->onlyMetaCtbIdx = false;
|
||||
|
||||
pCxt->optimized = true;
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.dnodes import tdDnodes
|
||||
from math import inf
|
||||
|
||||
class TDTestCase:
|
||||
def caseDescription(self):
|
||||
'''
|
||||
case1<shenglian zhou>: [TD-11204]Difference improvement that can ignore negative
|
||||
'''
|
||||
return
|
||||
|
||||
def init(self, conn, logSql, replicaVer=1):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), True)
|
||||
self._conn = conn
|
||||
|
||||
def restartTaosd(self, index=1, dbname="db"):
|
||||
tdDnodes.stop(index)
|
||||
tdDnodes.startWithoutSleep(index)
|
||||
tdSql.execute(f"use tagscan")
|
||||
|
||||
|
||||
def runSingleVgroup(self):
|
||||
print("running {}".format(__file__))
|
||||
tdSql.execute("drop database if exists tagscan2")
|
||||
tdSql.execute("create database if not exists tagscan2 vgroups 1")
|
||||
tdSql.execute('use tagscan2')
|
||||
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
|
||||
|
||||
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
|
||||
|
||||
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
|
||||
|
||||
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
|
||||
|
||||
tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);")
|
||||
|
||||
tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.query('select tags t1,t2 from stb1 order by t1,t2;')
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 0, 1)
|
||||
tdSql.checkData(0, 1, '1')
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(1, 1, '2')
|
||||
tdSql.checkData(2, 0, 3)
|
||||
tdSql.checkData(2, 1, '3')
|
||||
tdSql.checkData(3, 0, 4)
|
||||
tdSql.checkData(3, 1, '4')
|
||||
tdSql.checkData(4, 0, 5)
|
||||
tdSql.checkData(4, 1, '5')
|
||||
tdSql.checkData(5, 0, 5)
|
||||
tdSql.checkData(5, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 'tb3')
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 3)
|
||||
tdSql.checkData(0, 1, '3')
|
||||
tdSql.checkData(1, 0, 4)
|
||||
tdSql.checkData(1, 1, '4')
|
||||
tdSql.checkData(2, 0, 5)
|
||||
tdSql.checkData(2, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
|
||||
tdSql.execute('drop database tagscan2')
|
||||
def runMultiVgroups(self):
|
||||
print("running {}".format(__file__))
|
||||
tdSql.execute("drop database if exists tagscan")
|
||||
tdSql.execute("create database if not exists tagscan")
|
||||
tdSql.execute('use tagscan')
|
||||
tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);')
|
||||
|
||||
tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);")
|
||||
|
||||
tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);")
|
||||
|
||||
tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);")
|
||||
|
||||
tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);")
|
||||
|
||||
tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);")
|
||||
|
||||
tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);')
|
||||
|
||||
tdSql.query('select tags t1,t2 from stb1 order by t1,t2;')
|
||||
tdSql.checkRows(6)
|
||||
tdSql.checkData(0, 0, 1)
|
||||
tdSql.checkData(0, 1, '1')
|
||||
tdSql.checkData(1, 0, 2)
|
||||
tdSql.checkData(1, 1, '2')
|
||||
tdSql.checkData(2, 0, 3)
|
||||
tdSql.checkData(2, 1, '3')
|
||||
tdSql.checkData(3, 0, 4)
|
||||
tdSql.checkData(3, 1, '4')
|
||||
tdSql.checkData(4, 0, 5)
|
||||
tdSql.checkData(4, 1, '5')
|
||||
tdSql.checkData(5, 0, 5)
|
||||
tdSql.checkData(5, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 'tb3')
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;')
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0, 0, 3)
|
||||
tdSql.checkData(0, 1, '3')
|
||||
tdSql.checkData(1, 0, 4)
|
||||
tdSql.checkData(1, 1, '4')
|
||||
tdSql.checkData(2, 0, 5)
|
||||
tdSql.checkData(2, 1, '5')
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;')
|
||||
tdSql.checkRows(3)
|
||||
|
||||
|
||||
tdSql.execute('drop database tagscan')
|
||||
|
||||
def run(self):
|
||||
self.runMultiVgroups()
|
||||
self.runSingleVgroup()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -1271,6 +1271,7 @@
|
|||
#develop test
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py
|
||||
,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
|
||||
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py
|
||||
|
|
|
@ -140,6 +140,7 @@ class TDCom:
|
|||
self.range_count = 5
|
||||
self.default_interval = 5
|
||||
self.stream_timeout = 12
|
||||
self.create_stream_sleep = 0.5
|
||||
self.record_history_ts = str()
|
||||
self.precision = "ms"
|
||||
self.date_time = self.genTs(precision=self.precision)[0]
|
||||
|
@ -881,6 +882,7 @@ class TDCom:
|
|||
stream_options += f" ignore update 0"
|
||||
if not use_except:
|
||||
tdSql.execute(f'create stream if not exists {stream_name} trigger at_once {stream_options} {fill_history} into {des_table} {subtable} as {source_sql} {fill};')
|
||||
time.sleep(self.create_stream_sleep)
|
||||
return None
|
||||
else:
|
||||
return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table} {subtable} as {source_sql} {fill};'
|
||||
|
@ -906,6 +908,7 @@ class TDCom:
|
|||
stream_options += f" ignore update 0"
|
||||
if not use_except:
|
||||
tdSql.execute(f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};')
|
||||
time.sleep(self.create_stream_sleep)
|
||||
return None
|
||||
else:
|
||||
return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};'
|
||||
|
@ -1566,8 +1569,8 @@ class TDCom:
|
|||
res1 = tdSql.queryResult
|
||||
tdSql.query(sql2)
|
||||
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
|
||||
tdSql.sql = sql1
|
||||
new_list = list()
|
||||
|
||||
if tag_value_list:
|
||||
res1 = self.float_handle(res1)
|
||||
res2 = self.float_handle(res2)
|
||||
|
@ -1602,6 +1605,7 @@ class TDCom:
|
|||
tdSql.query(sql2)
|
||||
# res2 = tdSql.queryResult
|
||||
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
|
||||
tdSql.sql = sql1
|
||||
|
||||
if tag_value_list:
|
||||
res1 = self.float_handle(res1)
|
||||
|
@ -1643,6 +1647,7 @@ class TDCom:
|
|||
tdSql.query(sql2)
|
||||
# res2 = tdSql.queryResult
|
||||
res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult
|
||||
tdSql.sql = sql1
|
||||
|
||||
if tag_value_list:
|
||||
res1 = self.float_handle(res1)
|
||||
|
|
|
@ -338,11 +338,38 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
#
|
||||
def checkVGroups(self):
|
||||
|
||||
# db2
|
||||
tdSql.execute("create database db2 vgroups 2;")
|
||||
tdSql.execute("use db2;")
|
||||
tdSql.execute("create table st(ts timestamp, age int) tags(area int);")
|
||||
tdSql.execute("create table t1 using st tags(1);")
|
||||
tdSql.query("select distinct(tbname) from st limit 1 offset 100;")
|
||||
tdSql.checkRows(0)
|
||||
tdLog.info("check db2 vgroups 2 limit 1 offset 100 successfully!")
|
||||
|
||||
|
||||
# db1
|
||||
tdSql.execute("create database db1 vgroups 1;")
|
||||
tdSql.execute("use db1;")
|
||||
tdSql.execute("create table st(ts timestamp, age int) tags(area int);")
|
||||
tdSql.execute("create table t1 using st tags(1);")
|
||||
tdSql.query("select distinct(tbname) from st limit 1 offset 100;")
|
||||
tdSql.checkRows(0)
|
||||
tdLog.info("check db1 vgroups 1 limit 1 offset 100 successfully!")
|
||||
|
||||
|
||||
def run(self):
|
||||
# tdSql.prepare()
|
||||
self.prepareTestEnv()
|
||||
self.tmqCase1()
|
||||
|
||||
# one vgroup diff more than one vgroup check
|
||||
self.checkVGroups()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
|
Loading…
Reference in New Issue