diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index ecadb5a499..d85db04f5c 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -371,7 +371,7 @@ tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); -tmq_conf_set(conf, "auto.offset.reset", "earliest"); +tmq_conf_set(conf, "auto.offset.reset", "latest"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -401,7 +401,7 @@ properties.setProperty("group.id", "cgrpName"); properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); properties.setProperty("td.connect.user", "root"); properties.setProperty("td.connect.pass", "taosdata"); -properties.setProperty("auto.offset.reset", "earliest"); +properties.setProperty("auto.offset.reset", "latest"); properties.setProperty("msg.with.table.name", "true"); properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); @@ -421,7 +421,7 @@ public class MetersDeserializer extends ReferenceDeserializer { ```go conf := &tmq.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -441,7 +441,7 @@ consumer, err := NewConsumer(conf) let mut dsn: Dsn = "taos://".parse()?; dsn.set("group.id", "group1"); dsn.set("client.id", "test"); -dsn.set("auto.offset.reset", "earliest"); +dsn.set("auto.offset.reset", "latest"); let tmq = TmqBuilder::from_dsn(dsn)?; @@ -467,7 +467,7 @@ consumer = Consumer( "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "msg.with.table.name": "true", } ) @@ -487,7 +487,7 @@ let consumer = taos.consumer({ 'group.id': 'tg2', 'td.connect.user': 'root', 'td.connect.pass': 'taosdata', - 'auto.offset.reset','earliest', + 'auto.offset.reset','latest', 'msg.with.table.name': 'true', 'td.connect.ip','127.0.0.1', 'td.connect.port','6030' @@ -510,7 +510,7 @@ var cfg = new ConsumerConfig GourpId = "TDengine-TMQ-C#", TDConnectUser = "root", TDConnectPasswd = "taosdata", - AutoOffsetReset = "earliest" + AutoOffsetReset = "latest" MsgWithTableName = "true", TDConnectIp = "127.0.0.1", TDConnectPort = "6030" diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index 85bc67d0fd..fd43dd67fa 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -1093,7 +1093,7 @@ TaosConsumer consumer = new TaosConsumer<>(config); - httpConnectTimeout: WebSocket connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using WebSocket type. - messageWaitTimeout: socket timeout in milliseconds, the default value is 10000 ms. It only takes effect when using WebSocket type. - httpPoolSize: Maximum number of concurrent requests on the a connection。It only takes effect when using WebSocket type. -- For more information, see [Consumer Parameters](../../../develop/tmq). +- For more information, see [Consumer Parameters](../../../develop/tmq). Note that the default value of auto.offset.reset in data subscription on the TDengine server has changed since version 3.2.0.0. #### Subscribe to consume data @@ -1193,7 +1193,7 @@ public abstract class ConsumerLoop { config.setProperty("bootstrap.servers", "localhost:6030"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); - config.setProperty("auto.offset.reset", "earliest"); + config.setProperty("auto.offset.reset", "latest"); config.setProperty("msg.with.table.name", "true"); config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); @@ -1276,7 +1276,7 @@ public abstract class ConsumerLoop { config.setProperty("bootstrap.servers", "localhost:6041"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); - config.setProperty("auto.offset.reset", "earliest"); + config.setProperty("auto.offset.reset", "latest"); config.setProperty("msg.with.table.name", "true"); config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); diff --git a/docs/en/14-reference/03-connector/05-go.mdx b/docs/en/14-reference/03-connector/05-go.mdx index b3d4857d75..a0be7a4a02 100644 --- a/docs/en/14-reference/03-connector/05-go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -794,7 +794,7 @@ The TDengine Go Connector supports subscription functionality with the following ```go consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -870,6 +870,7 @@ package main import ( "fmt" "os" + "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" @@ -890,19 +891,16 @@ func main() { if err != nil { panic(err) } - if err != nil { - panic(err) - } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_client", - "enable.auto.commit": "false", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "msg.with.table.name": "true", }) if err != nil { panic(err) @@ -915,10 +913,16 @@ func main() { if err != nil { panic(err) } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } + go func() { + for { + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) + } + }() + for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { @@ -972,6 +976,7 @@ package main import ( "database/sql" "fmt" + "time" "github.com/taosdata/driver-go/v3/common" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" @@ -995,7 +1000,7 @@ func main() { "td.connect.pass": "taosdata", "group.id": "example", "client.id": "example_consumer", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", }) if err != nil { panic(err) @@ -1004,29 +1009,34 @@ func main() { if err != nil { panic(err) } + + _, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," + + "c1 bool," + + "c2 tinyint," + + "c3 smallint," + + "c4 int," + + "c5 bigint," + + "c6 tinyint unsigned," + + "c7 smallint unsigned," + + "c8 int unsigned," + + "c9 bigint unsigned," + + "c10 float," + + "c11 double," + + "c12 binary(20)," + + "c13 nchar(20)" + + ")") + if err != nil { + panic(err) + } go func() { - _, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + - "c1 bool," + - "c2 tinyint," + - "c3 smallint," + - "c4 int," + - "c5 bigint," + - "c6 tinyint unsigned," + - "c7 smallint unsigned," + - "c8 int unsigned," + - "c9 bigint unsigned," + - "c10 float," + - "c11 double," + - "c12 binary(20)," + - "c13 nchar(20)" + - ")") - if err != nil { - panic(err) - } - _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") - if err != nil { - panic(err) + for { + _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) } + }() for i := 0; i < 5; i++ { ev := consumer.Poll(500) diff --git a/docs/en/14-reference/03-connector/06-rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx index a98683d43c..5a44b161cb 100644 --- a/docs/en/14-reference/03-connector/06-rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -442,7 +442,7 @@ The following parameters can be configured for the TMQ DSN. Only `group.id` is m - `group.id`: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis. - `client.id`: Subscriber client ID. -- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group. +- `auto.offset.reset`: Initial point of subscription. *earliest* subscribes from the beginning, and *latest* subscribes from the newest message. The default value varies depending on the TDengine version. For details, see [Data Subscription](https://docs.tdengine.com/develop/tmq/). Note: This parameter is set per consumer group. - `enable.auto.commit`: Automatically commits. This can be enabled when data consistency is not essential. - `auto.commit.interval.ms`: Interval for automatic commits. diff --git a/docs/examples/go/sub/main.go b/docs/examples/go/sub/main.go index ed335cfdea..41cb33e94d 100644 --- a/docs/examples/go/sub/main.go +++ b/docs/examples/go/sub/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" @@ -27,15 +28,15 @@ func main() { panic(err) } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_client", - "enable.auto.commit": "false", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "msg.with.table.name": "true", }) if err != nil { panic(err) @@ -48,12 +49,17 @@ func main() { if err != nil { panic(err) } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } + go func() { + for { + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") + if err != nil { + panic(err) + } + time.Sleep(time.Microsecond * 100) + } + }() for i := 0; i < 5; i++ { - ev := consumer.Poll(0) + ev := consumer.Poll(500) if ev != nil { switch e := ev.(type) { case *tmqcommon.DataMessage: diff --git a/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java b/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java index 3c5d2867e2..8162c30ff6 100644 --- a/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java +++ b/docs/examples/java/src/main/java/com/taos/example/SubscribeDemo.java @@ -66,7 +66,6 @@ public class SubscribeDemo { properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taos.example.MetersDeserializer"); properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8"); - properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true"); // poll data try (TaosConsumer consumer = new TaosConsumer<>(properties)) { diff --git a/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java index 03f7e3a11e..7df15f1af6 100644 --- a/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java +++ b/docs/examples/java/src/main/java/com/taos/example/WebsocketSubscribeDemo.java @@ -66,7 +66,6 @@ public class WebsocketSubscribeDemo { properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taos.example.MetersDeserializer"); properties.setProperty(TMQConstants.VALUE_DESERIALIZER_ENCODING, "UTF-8"); - properties.setProperty(TMQConstants.EXPERIMENTAL_SNAPSHOT_ENABLE, "true"); // poll data try (TaosConsumer consumer = new TaosConsumer<>(properties)) { diff --git a/docs/examples/python/tmq_assignment_example.py b/docs/examples/python/tmq_assignment_example.py index 41737e3fc4..c370db47a5 100644 --- a/docs/examples/python/tmq_assignment_example.py +++ b/docs/examples/python/tmq_assignment_example.py @@ -23,9 +23,6 @@ def taos_get_assignment_and_seek_demo(): consumer = Consumer( { "group.id": "0", - # should disable snapshot, - # otherwise it will cause invalid params error - "experimental.snapshot.enable": "false", } ) consumer.subscribe(["tmq_assignment_demo_topic"]) diff --git a/docs/examples/python/tmq_websocket_assgnment_example.py b/docs/examples/python/tmq_websocket_assgnment_example.py index 0f8e4a2804..a180ef840e 100644 --- a/docs/examples/python/tmq_websocket_assgnment_example.py +++ b/docs/examples/python/tmq_websocket_assgnment_example.py @@ -21,9 +21,6 @@ def taosws_get_assignment_and_seek_demo(): prepare() consumer = taosws.Consumer(conf={ "td.connect.websocket.scheme": "ws", - # should disable snapshot, - # otherwise it will cause invalid params error - "experimental.snapshot.enable": "false", "group.id": "0", }) consumer.subscribe(["tmq_assignment_demo_topic"]) diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index 8e43631c9a..7134abaa44 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -370,7 +370,7 @@ tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); tmq_conf_set(conf, "group.id", "cgrpName"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); -tmq_conf_set(conf, "auto.offset.reset", "earliest"); +tmq_conf_set(conf, "auto.offset.reset", "latest"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); @@ -400,7 +400,7 @@ properties.setProperty("group.id", "cgrpName"); properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); properties.setProperty("td.connect.user", "root"); properties.setProperty("td.connect.pass", "taosdata"); -properties.setProperty("auto.offset.reset", "earliest"); +properties.setProperty("auto.offset.reset", "latest"); properties.setProperty("msg.with.table.name", "true"); properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); @@ -420,7 +420,7 @@ public class MetersDeserializer extends ReferenceDeserializer { ```go conf := &tmq.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -440,7 +440,7 @@ consumer, err := NewConsumer(conf) let mut dsn: Dsn = "taos://".parse()?; dsn.set("group.id", "group1"); dsn.set("client.id", "test"); -dsn.set("auto.offset.reset", "earliest"); +dsn.set("auto.offset.reset", "latest"); let tmq = TmqBuilder::from_dsn(dsn)?; @@ -468,7 +468,7 @@ consumer = Consumer( "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "msg.with.table.name": "true", } ) @@ -488,7 +488,7 @@ let consumer = taos.consumer({ 'group.id': 'tg2', 'td.connect.user': 'root', 'td.connect.pass': 'taosdata', - 'auto.offset.reset','earliest', + 'auto.offset.reset','latest', 'msg.with.table.name': 'true', 'td.connect.ip','127.0.0.1', 'td.connect.port','6030' @@ -511,7 +511,7 @@ var cfg = new ConsumerConfig GourpId = "TDengine-TMQ-C#", TDConnectUser = "root", TDConnectPasswd = "taosdata", - AutoOffsetReset = "earliest" + AutoOffsetReset = "latest" MsgWithTableName = "true", TDConnectIp = "127.0.0.1", TDConnectPort = "6030" diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index 7b6db8ee69..6ff92bce18 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -1095,7 +1095,7 @@ TaosConsumer consumer = new TaosConsumer<>(config); - httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。 - messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。 - httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。 - 其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group) + 其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group), 注意TDengine服务端自3.2.0.0版本开始消息订阅中的auto.offset.reset默认值发生变化。 #### 订阅消费数据 @@ -1193,7 +1193,7 @@ public abstract class ConsumerLoop { config.setProperty("bootstrap.servers", "localhost:6030"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); - config.setProperty("auto.offset.reset", "earliest"); + config.setProperty("auto.offset.reset", "latest"); config.setProperty("msg.with.table.name", "true"); config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); @@ -1201,7 +1201,6 @@ public abstract class ConsumerLoop { config.setProperty("client.id", "1"); config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer"); config.setProperty("value.deserializer.encoding", "UTF-8"); - config.setProperty("experimental.snapshot.enable", "true"); this.consumer = new TaosConsumer<>(config); this.topics = Collections.singletonList("topic_speed"); @@ -1279,7 +1278,7 @@ public abstract class ConsumerLoop { config.setProperty("bootstrap.servers", "localhost:6041"); config.setProperty("td.connect.user", "root"); config.setProperty("td.connect.pass", "taosdata"); - config.setProperty("auto.offset.reset", "earliest"); + config.setProperty("auto.offset.reset", "latest"); config.setProperty("msg.with.table.name", "true"); config.setProperty("enable.auto.commit", "true"); config.setProperty("auto.commit.interval.ms", "1000"); @@ -1287,7 +1286,6 @@ public abstract class ConsumerLoop { config.setProperty("client.id", "1"); config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ConsumerLoop$ResultDeserializer"); config.setProperty("value.deserializer.encoding", "UTF-8"); - config.setProperty("experimental.snapshot.enable", "true"); this.consumer = new TaosConsumer<>(config); this.topics = Collections.singletonList("topic_speed"); diff --git a/docs/zh/08-connector/20-go.mdx b/docs/zh/08-connector/20-go.mdx index 90ef4d83ca..3994278eef 100644 --- a/docs/zh/08-connector/20-go.mdx +++ b/docs/zh/08-connector/20-go.mdx @@ -797,7 +797,7 @@ TDengine Go 连接器支持订阅功能,应用 API 如下: ```go consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ "group.id": "test", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", "td.connect.ip": "127.0.0.1", "td.connect.user": "root", "td.connect.pass": "taosdata", @@ -873,6 +873,7 @@ package main import ( "fmt" "os" + "time" "github.com/taosdata/driver-go/v3/af" "github.com/taosdata/driver-go/v3/af/tmq" @@ -893,19 +894,16 @@ func main() { if err != nil { panic(err) } - if err != nil { - panic(err) - } consumer, err := tmq.NewConsumer(&tmqcommon.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_client", - "enable.auto.commit": "false", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "latest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_client", + "enable.auto.commit": "false", + "msg.with.table.name": "true", }) if err != nil { panic(err) @@ -918,10 +916,16 @@ func main() { if err != nil { panic(err) } - _, err = db.Exec("insert into example_tmq.t1 values(now,1)") - if err != nil { - panic(err) - } + go func() { + for { + _, err = db.Exec("insert into example_tmq.t1 values(now,1)") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) + } + }() + for i := 0; i < 5; i++ { ev := consumer.Poll(500) if ev != nil { @@ -975,6 +979,7 @@ package main import ( "database/sql" "fmt" + "time" "github.com/taosdata/driver-go/v3/common" tmqcommon "github.com/taosdata/driver-go/v3/common/tmq" @@ -998,7 +1003,7 @@ func main() { "td.connect.pass": "taosdata", "group.id": "example", "client.id": "example_consumer", - "auto.offset.reset": "earliest", + "auto.offset.reset": "latest", }) if err != nil { panic(err) @@ -1007,29 +1012,34 @@ func main() { if err != nil { panic(err) } + + _, err = db.Exec("create table example_ws_tmq.t_all(ts timestamp," + + "c1 bool," + + "c2 tinyint," + + "c3 smallint," + + "c4 int," + + "c5 bigint," + + "c6 tinyint unsigned," + + "c7 smallint unsigned," + + "c8 int unsigned," + + "c9 bigint unsigned," + + "c10 float," + + "c11 double," + + "c12 binary(20)," + + "c13 nchar(20)" + + ")") + if err != nil { + panic(err) + } go func() { - _, err := db.Exec("create table example_ws_tmq.t_all(ts timestamp," + - "c1 bool," + - "c2 tinyint," + - "c3 smallint," + - "c4 int," + - "c5 bigint," + - "c6 tinyint unsigned," + - "c7 smallint unsigned," + - "c8 int unsigned," + - "c9 bigint unsigned," + - "c10 float," + - "c11 double," + - "c12 binary(20)," + - "c13 nchar(20)" + - ")") - if err != nil { - panic(err) - } - _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") - if err != nil { - panic(err) + for { + _, err = db.Exec("insert into example_ws_tmq.t_all values(now,true,2,3,4,5,6,7,8,9,10.123,11.123,'binary','nchar')") + if err != nil { + panic(err) + } + time.Sleep(time.Millisecond * 100) } + }() for i := 0; i < 5; i++ { ev := consumer.Poll(500) diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index 018552117e..37b1787707 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -447,7 +447,7 @@ consumer.unsubscribe().await; - `group.id`: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。 - `client.id`: 可选的订阅客户端识别项。 -- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 `group.id` 中仅生效一次。 +- `auto.offset.reset`: 可选初始化订阅起点, *earliest* 为从头开始订阅, *latest* 为仅从最新数据开始订阅,默认值根据 TDengine 版本有所不同,详细参见 [数据订阅](https://docs.taosdata.com/develop/tmq/)。注意,此选项在同一个 `group.id` 中仅生效一次。 - `enable.auto.commit`: 当设置为 `true` 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。 - `auto.commit.interval.ms`: 自动标记的时间间隔。 diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index 449f6f6cdb..f731e8c280 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -154,7 +154,7 @@ interactiveFqdn=yes # [yes | no] verType=server # [server | client] initType=systemd # [systemd | service | ...] -while getopts "hv:e:i:" arg; do +while getopts "hv:e:" arg; do case $arg in e) #echo "interactiveFqdn=$OPTARG" @@ -164,10 +164,6 @@ while getopts "hv:e:i:" arg; do #echo "verType=$OPTARG" verType=$(echo $OPTARG) ;; - i) - #echo "initType=$OPTARG" - initType=$(echo $OPTARG) - ;; h) echo "Usage: $(basename $0) -v [server | client] -e [yes | no]" exit 0 @@ -385,7 +381,10 @@ function set_hostname() { echo -e -n "${GREEN}Enter the public accessible IP address or fully qualified domain name TDengine will expose to users or applications (must not be 'localhost') :${NC}" read newHostname while true; do - if [[ ! -z "$newHostname" && "$newHostname" != "localhost" ]]; then + if [ -z "$newHostname" ]; then + newHostname=$(hostname) + break + elif [ "$newHostname" != "localhost" ]; then break else echo -e -n "${GREEN}Enter the public accessible IP address or fully qualified domain name TDengine will expose to users or applications (must not be 'localhost') :${NC}" @@ -518,9 +517,7 @@ function install_adapter_config() { } -function install_config() { - - local_fqdn_check +function install_config() { if [ ! -f "${cfg_install_dir}/${configFile2}" ]; then ${csudo}mkdir -p ${cfg_install_dir} @@ -542,13 +539,15 @@ function install_config() { - # if ((${update_flag} == 1)); then - # return 0 - # fi + if ((${update_flag} == 1)); then + return 0 + fi - # if [ "$interactiveFqdn" == "no" ]; then - # return 0 - # fi + if [ "$interactiveFqdn" == "no" ]; then + return 0 + fi + + local_fqdn_check echo echo -e -n "${GREEN}Enter FQDN:port (like h1.${emailName2}:6030) of an existing ${productName2} cluster node to join${NC}" @@ -1073,7 +1072,7 @@ function installProduct() { echo -e "\033[44;32;1mTo access ${productName2} : ${clientName2} -h $serverFqdn${NC}" if [ "$verMode" == "cluster" ];then echo -e "\033[44;32;1mTo access the management system : http://$serverFqdn:6060${NC}" - echo -e "\033[44;32;1mTo read the user manual : http://$serverFqdn:6060/docs${NC}" + echo -e "\033[44;32;1mTo read the user manual : http://$serverFqdn:6060/docs-en${NC}" fi echo else # Only install client diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index b83e8d74c9..5b5a7d7c30 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -63,6 +63,10 @@ service_config_dir="/etc/systemd/system" taos_service_name=${serverName2} taosadapter_service_name="${clientName2}adapter" tarbitrator_service_name="tarbitratord" + +config_dir="/etc/${clientName2}" + + csudo="" if command -v sudo >/dev/null; then csudo="sudo " @@ -264,6 +268,20 @@ function clean_service() { fi } +function remove_data_and_config() { + data_dir=`grep dataDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` + if [ X"$data_dir" == X"" ]; then + data_dir="/var/lib/taos" + fi + log_dir=`grep logDir /etc/taos/taos.cfg | grep -v '#' | tail -n 1 | awk {'print $2'}` + if [ X"$log_dir" == X"" ]; then + log_dir="/var/lib/taos" + fi + ${csudo}rm -rf ${config_dir}/* + ${csudo}rm -rf ${data_dir}/* + ${csudo}rm -rf ${log_dir}/* +} + function uninstall_taosx() { if [ -f /usr/local/taosx/uninstall.sh ]; then cd /usr/local/taosx @@ -374,6 +392,14 @@ remove_taoskeeper() { } remove_taoskeeper +echo +echo "Do you want to remove all the data, log and configuration files? [y/n]" +read answer +if [ X$answer == X"y" ] || [ X$answer == X"Y" ]; then + remove_data_and_config +fi + + if [ "$verMode" == "cluster" ]; then uninstall_taosx fi diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index e7ba30d78c..58dbe3793d 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -15,15 +15,14 @@ #include "cJSON.h" #include "clientInt.h" -#include "clientLog.h" #include "parser.h" #include "tdatablock.h" #include "tdef.h" #include "tglobal.h" #include "tmsgtype.h" -#include "tqueue.h" -#include "tref.h" -#include "ttimer.h" + +#define LOG_ID_TAG "connId:0x%"PRIx64",reqId:0x%"PRIx64 +#define LOG_ID_VALUE *(int64_t*)taos,pRequest->requestId static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } @@ -32,6 +31,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { + uError("create json object failed") return NULL; } cJSON* type = cJSON_CreateString("create"); @@ -106,6 +106,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { json = cJSON_CreateObject(); if (json == NULL) { + uError("create json object failed"); goto end; } cJSON* type = cJSON_CreateString("alter"); @@ -192,7 +193,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { SDecoder coder; char* string = NULL; - uDebug("processCreateStb called"); + uDebug("create stable data:%p", metaRsp); // decode and process req void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); @@ -202,8 +203,8 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { goto _err; } string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); - uDebug("processCreateStb %s", string); _err: + uDebug("create stable return, sql json:%s", string); tDecoderClear(&coder); return string; } @@ -212,7 +213,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; char* string = NULL; - uDebug("processAlterStb called"); + uDebug("alter stable data:%p", metaRsp); // decode and process req void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); @@ -223,9 +224,8 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { goto _err; } string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); - uDebug("processAlterStb %s", string); - _err: + uDebug("alter stable return, sql json:%s", string); tDecoderClear(&coder); return string; } @@ -251,12 +251,14 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { SArray* pTagVals = NULL; int32_t code = tTagToValArray(pTag, &pTagVals); if (code) { + uError("tTagToValArray failed code:%d", code); goto end; } if (tTagIsJson(pTag)) { STag* p = (STag*)pTag; if (p->nTag == 0) { + uError("p->nTag == 0"); goto end; } char* pJson = parseTagDatatoJson(pTag); @@ -322,6 +324,7 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { + uError("create json object failed"); return NULL; } cJSON* type = cJSON_CreateString("create"); @@ -353,7 +356,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { SVCreateTbReq* pCreateReq; char* string = NULL; // decode - uDebug("processCreateTable called"); + uDebug("create table data:%p", metaRsp); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); tDecoderInit(&decoder, data, len); @@ -370,10 +373,10 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); } - uDebug("processCreateTable :%s", string); } _exit: + uDebug("create table return, sql json:%s", string); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -387,9 +390,9 @@ _exit: } static char* processAutoCreateTable(STaosxRsp* rsp) { - uDebug("processAutoCreateTable called"); + uDebug("auto create table data:%p", rsp); if (rsp->createTableNum <= 0) { - uError("WriteRaw:processAutoCreateTable rsp->createTableNum <= 0"); + uError("processAutoCreateTable rsp->createTableNum <= 0"); goto _exit; } @@ -408,13 +411,13 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { } if (pCreateReq[iReq].type != TSDB_CHILD_TABLE) { - uError("WriteRaw:processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE"); + uError("processAutoCreateTable pCreateReq[iReq].type != TSDB_CHILD_TABLE"); goto _exit; } } string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); - uDebug("processAutoCreateTable :%s", string); _exit: + uDebug("auto created table return, sql json:%s", string); for (int i = 0; i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); @@ -433,17 +436,19 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { char* string = NULL; cJSON* json = NULL; - uDebug("processAlterTable called"); + uDebug("alter table data:%p", metaRsp); // decode void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); tDecoderInit(&decoder, data, len); if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) { + uError("tDecodeSVAlterTbReq error"); goto _exit; } json = cJSON_CreateObject(); if (json == NULL) { + uError("create json object failed"); goto _exit; } cJSON* type = cJSON_CreateString("alter"); @@ -543,9 +548,9 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { break; } string = cJSON_PrintUnformatted(json); - uDebug("processAlterTable :%s", string); _exit: + uDebug("alter table return, sql json:%s", string); cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -556,18 +561,20 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { SVDropStbReq req = {0}; char* string = NULL; cJSON* json = NULL; - uDebug("processDropSTable called"); + uDebug("processDropSTable data:%p", metaRsp); // decode void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); tDecoderInit(&decoder, data, len); if (tDecodeSVDropStbReq(&decoder, &req) < 0) { + uError("tDecodeSVDropStbReq failed"); goto _exit; } json = cJSON_CreateObject(); if (json == NULL) { + uError("create json object failed"); goto _exit; } cJSON* type = cJSON_CreateString("drop"); @@ -578,8 +585,8 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { cJSON_AddItemToObject(json, "tableName", tableName); string = cJSON_PrintUnformatted(json); - uDebug("processDropSTable :%s", string); _exit: + uDebug("processDropSTable return, sql json:%s", string); cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -587,18 +594,17 @@ _exit: static char* processDeleteTable(SMqMetaRsp* metaRsp) { SDeleteRes req = {0}; SDecoder coder = {0}; - int32_t code = TSDB_CODE_SUCCESS; cJSON* json = NULL; char* string = NULL; - uDebug("processDeleteTable called"); + uDebug("processDeleteTable data:%p", metaRsp); // decode and process req void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (tDecodeDeleteRes(&coder, &req) < 0) { - code = TSDB_CODE_INVALID_PARA; + uError("tDecodeDeleteRes failed"); goto _exit; } @@ -606,10 +612,10 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) { char sql[256] = {0}; snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); - uDebug("delete sql:%s\n", sql); json = cJSON_CreateObject(); if (json == NULL) { + uError("creaet json object failed"); goto _exit; } cJSON* type = cJSON_CreateString("delete"); @@ -618,8 +624,8 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) { cJSON_AddItemToObject(json, "sql", sqlJson); string = cJSON_PrintUnformatted(json); - uDebug("processDeleteTable :%s", string); _exit: + uDebug("processDeleteTable return, sql json:%s", string); cJSON_Delete(json); tDecoderClear(&coder); return string; @@ -631,17 +637,19 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { char* string = NULL; cJSON* json = NULL; - uDebug("processDropTable called"); + uDebug("processDropTable data:%p", metaRsp); // decode void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); tDecoderInit(&decoder, data, len); if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) { + uError("tDecodeSVDropTbBatchReq failed"); goto _exit; } json = cJSON_CreateObject(); if (json == NULL) { + uError("create json object failed"); goto _exit; } cJSON* type = cJSON_CreateString("drop"); @@ -661,26 +669,30 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { cJSON_AddItemToObject(json, "tableNameList", tableNameList); string = cJSON_PrintUnformatted(json); - uDebug("processDropTable :%s", string); _exit: + uDebug("processDropTable return, json sql:%s", string); cJSON_Delete(json); tDecoderClear(&decoder); return string; } static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { + if(taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } SVCreateStbReq req = {0}; SDecoder coder; SMCreateStbReq pReq = {0}; int32_t code = TSDB_CODE_SUCCESS; SRequestObj* pRequest = NULL; - uDebug("taosCreateStb called"); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); if (code != TSDB_CODE_SUCCESS) { - goto end; + terrno = code; + return code; } - + uDebug(LOG_ID_TAG" create stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -719,7 +731,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { pReq.source = TD_REQ_FROM_TAOX; pReq.igExists = true; - uDebug("taosCreateStb name:%s suid:%" PRId64 " processSuid:%" PRId64, req.name, req.suid, pReq.suid); + uDebug(LOG_ID_TAG" create stable name:%s suid:%" PRId64 " processSuid:%" PRId64, + LOG_ID_VALUE, req.name, req.suid, pReq.suid); STscObj* pTscObj = pRequest->pTscObj; SName tableName; tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); @@ -753,25 +766,32 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { taosMemoryFree(pCmdMsg.pMsg); end: + uDebug(LOG_ID_TAG" create stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); tDecoderClear(&coder); + terrno = code; return code; } static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { + if(taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } SVDropStbReq req = {0}; SDecoder coder = {0}; SMDropStbReq pReq = {0}; int32_t code = TSDB_CODE_SUCCESS; SRequestObj* pRequest = NULL; - uDebug("taosDropStb called"); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); if (code != TSDB_CODE_SUCCESS) { - goto end; + terrno = code; + return code; } + uDebug(LOG_ID_TAG" drop stable, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -805,7 +825,6 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { goto end; } if (code != TSDB_CODE_SUCCESS) { - uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", req.name); goto end; } pReq.suid = pTableMeta->uid; @@ -816,7 +835,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { pReq.source = TD_REQ_FROM_TAOX; // pReq.suid = processSuid(req.suid, pRequest->pDb); - uDebug("taosDropStb name:%s suid:%" PRId64 " new suid:%" PRId64, req.name, req.suid, pReq.suid); + uDebug(LOG_ID_TAG" drop stable name:%s suid:%" PRId64 " new suid:%" PRId64, + LOG_ID_VALUE, req.name, req.suid, pReq.suid); STscObj* pTscObj = pRequest->pTscObj; SName tableName = {0}; tNameExtractFullName(toName(pTscObj->acctId, pRequest->pDb, req.name, &tableName), pReq.name); @@ -850,8 +870,10 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { taosMemoryFree(pCmdMsg.pMsg); end: + uDebug(LOG_ID_TAG" drop stable return, msg:%s", LOG_ID_VALUE, tstrerror(code)); destroyRequest(pRequest); tDecoderClear(&coder); + terrno = code; return code; } @@ -867,6 +889,10 @@ static void destroyCreateTbReqBatch(void* data) { } static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { + if(taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } SVCreateTbBatchReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -874,12 +900,14 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { SQuery* pQuery = NULL; SHashObj* pVgroupHashmap = NULL; - uDebug("taosCreateTable called"); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); if (code != TSDB_CODE_SUCCESS) { - goto end; + terrno = code; + return code; } + uDebug(LOG_ID_TAG " create table, meta:%p, metaLen:%d", LOG_ID_VALUE, meta, metaLen); + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -939,12 +967,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { toName(pTscObj->acctId, pRequest->pDb, pCreateReq->ctb.stbName, &sName); code = catalogGetTableMeta(pCatalog, &conn, &sName, &pTableMeta); if (code != TSDB_CODE_SUCCESS) { - uError("taosCreateTable:catalogGetTableMeta failed. table name: %s", pCreateReq->ctb.stbName); goto end; } pCreateReq->ctb.suid = pTableMeta->uid; - uDebug("taosCreateTable name:%s sname:%s suid:%" PRId64 " new suid:%" PRId64, pCreateReq->name, - pCreateReq->ctb.stbName, oldSuid, pCreateReq->ctb.suid); for (int32_t i = 0; i < taosArrayGetSize(pCreateReq->ctb.tagName); i++) { char* tName = taosArrayGet(pCreateReq->ctb.tagName, i); @@ -999,6 +1024,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; end: + uDebug(LOG_ID_TAG" create table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -1011,6 +1037,7 @@ end: destroyRequest(pRequest); tDecoderClear(&coder); qDestroyQuery(pQuery); + terrno = code; return code; } @@ -1026,6 +1053,10 @@ static void destroyDropTbReqBatch(void* data) { } static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { + if(taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } SVDropTbBatchReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1033,11 +1064,13 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { SQuery* pQuery = NULL; SHashObj* pVgroupHashmap = NULL; - uDebug("taosDropTable called"); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); if (code != TSDB_CODE_SUCCESS) { - goto end; + terrno = code; + return code; } + uDebug(LOG_ID_TAG " drop table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen); + pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -1095,13 +1128,12 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { continue; } if (code != TSDB_CODE_SUCCESS) { - uError("taosDropTable:catalogGetTableMeta failed. table name: %s", pDropReq->name); goto end; } tb_uid_t oldSuid = pDropReq->suid; pDropReq->suid = pTableMeta->suid; taosMemoryFreeClear(pTableMeta); - uDebug("taosDropTable name:%s suid:%" PRId64 " new suid:%" PRId64, pDropReq->name, oldSuid, pDropReq->suid); + uDebug(LOG_ID_TAG" drop table name:%s suid:%" PRId64 " new suid:%" PRId64, LOG_ID_VALUE, pDropReq->name, oldSuid, pDropReq->suid); taosArrayPush(pRequest->tableList, &pName); SVgroupDropTableBatch* pTableBatch = taosHashGet(pVgroupHashmap, &pInfo.vgId, sizeof(pInfo.vgId)); @@ -1144,10 +1176,12 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; end: + uDebug(LOG_ID_TAG" drop table return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); tDecoderClear(&coder); qDestroyQuery(pQuery); + terrno = code; return code; } @@ -1184,11 +1218,17 @@ end: //} static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { - SDeleteRes req = {0}; - SDecoder coder = {0}; + if(taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + SDeleteRes req = {0}; + SDecoder coder = {0}; + char sql[256] = {0}; int32_t code = TSDB_CODE_SUCCESS; - uDebug("taosDeleteData called"); + uDebug("connId:0x%"PRIx64" delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen); + // decode and process req void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); int32_t len = metaLen - sizeof(SMsgHead); @@ -1199,10 +1239,8 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } // getTbName(req.tableFName); - char sql[256] = {0}; snprintf(sql, sizeof(sql), "delete from `%s` where `%s` >= %" PRId64 " and `%s` <= %" PRId64, req.tableFName, req.tsColName, req.skey, req.tsColName, req.ekey); - uDebug("delete sql:%s\n", sql); TAOS_RES* res = taos_query(taos, sql); SRequestObj* pRequest = (SRequestObj*)res; @@ -1213,11 +1251,17 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { taos_free_result(res); end: + uDebug("connId:0x%"PRIx64" delete data sql:%s, code:%s", *(int64_t*)taos, sql, tstrerror(code)); tDecoderClear(&coder); + terrno = code; return code; } static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { + if(taos == NULL || meta == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } SVAlterTbReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1226,13 +1270,12 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { SArray* pArray = NULL; SVgDataBlocks* pVgData = NULL; - uDebug("taosAlterTable called"); code = buildRequest(*(int64_t*)taos, "", 0, NULL, false, &pRequest, 0); - if (code != TSDB_CODE_SUCCESS) { - goto end; + terrno = code; + return code; } - + uDebug(LOG_ID_TAG " alter table, meta:%p, len:%d", LOG_ID_VALUE, meta, metaLen); pRequest->syncQuery = true; if (!pRequest->pDb) { code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; @@ -1272,7 +1315,6 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } - uDebug("taosAlterTable name:%s", req.tbName); pArray = taosArrayInit(1, sizeof(void*)); if (NULL == pArray) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1327,36 +1369,38 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } } end: + uDebug(LOG_ID_TAG " alter table return, meta:%p, len:%d, msg:%s", LOG_ID_VALUE, meta, metaLen, tstrerror(code)); taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); destroyRequest(pRequest); tDecoderClear(&coder); qDestroyQuery(pQuery); + terrno = code; return code; } int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields, int numFields) { if (!taos || !pData || !tbname) { - return TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; SQuery* pQuery = NULL; SHashObj* pVgHash = NULL; - uDebug("taos_write_raw_block_with_fields called"); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); if (!pRequest) { - uError("WriteRaw:createRequest error request is null"); - code = terrno; - goto end; + return terrno; } + uDebug(LOG_ID_TAG " write raw block with field, rows:%d, pData:%p, tbname:%s, fields:%p, numFields:%d", + LOG_ID_VALUE, rows, pData, tbname, fields, numFields); + pRequest->syncQuery = true; if (!pRequest->pDb) { - uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; } @@ -1365,11 +1409,9 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.tname, tbname, sizeof(pName.tname)); - uDebug("taos_write_raw_block_with_fields name:%s", tbname); struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw: get gatlog error"); goto end; } @@ -1382,13 +1424,11 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch SVgroupInfo vgData = {0}; code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname); goto end; } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); goto end; } // uError("td23101 0vgId:%d, vgId:%d, name:%s, uid:%"PRIu64, vgData.vgId, pTableMeta->vgId, tbname, pTableMeta->uid); @@ -1400,18 +1440,14 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); - // uError("td23101 1vgId:%d, numEps:%d, name:%s, uid:%"PRIu64, vgData.vgId, vgData.epSet.numOfEps, tbname, - // pTableMeta->uid); code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:rawBlockBindData failed"); goto end; } code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { - uError("smlBuildOutput failed"); goto end; } @@ -1419,33 +1455,34 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch code = pRequest->code; end: + uDebug(LOG_ID_TAG " write raw block with field return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); + terrno = code; return code; } int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) { if (!taos || !pData || !tbname) { - return TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; SQuery* pQuery = NULL; SHashObj* pVgHash = NULL; - uDebug("taos_write_raw_block called"); SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); if (!pRequest) { - uError("WriteRaw:createRequest error request is null"); - code = terrno; - goto end; + return terrno; } + uDebug(LOG_ID_TAG " write raw block, rows:%d, pData:%p, tbname:%s", LOG_ID_VALUE, rows, pData, tbname); + pRequest->syncQuery = true; if (!pRequest->pDb) { - uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; } @@ -1454,11 +1491,9 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname)); tstrncpy(pName.tname, tbname, sizeof(pName.tname)); - uDebug("taos_write_raw_block name:%s", tbname); struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw: get gatlog error"); goto end; } @@ -1471,13 +1506,11 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) SVgroupInfo vgData = {0}; code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgData); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbname); goto end; } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbname); goto end; } pQuery = smlInitHandle(); @@ -1490,13 +1523,11 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:rawBlockBindData failed"); goto end; } code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { - uError("smlBuildOutput failed"); goto end; } @@ -1504,29 +1535,34 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) code = pRequest->code; end: + uDebug(LOG_ID_TAG " write raw block return, msg:%s", LOG_ID_VALUE, tstrerror(code)); taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); + terrno = code; return code; } static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { + if(taos == NULL || data == NULL){ + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } int32_t code = TSDB_CODE_SUCCESS; SHashObj* pVgHash = NULL; SQuery* pQuery = NULL; SMqRspObj rspObj = {0}; SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; - uDebug("tmqWriteRawDataImpl called"); terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); if (!pRequest) { - uError("WriteRaw:createRequest error request is null"); return terrno; } + uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; rspObj.resIter = -1; rspObj.resType = RES_TYPE__TMQ; @@ -1534,13 +1570,11 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { tDecoderInit(&decoder, data, dataLen); code = tDecodeMqDataRsp(&decoder, &rspObj.rsp); if (code != 0) { - uError("WriteRaw:decode smqDataRsp error"); code = TSDB_CODE_INVALID_MSG; goto end; } if (!pRequest->pDb) { - uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; } @@ -1548,7 +1582,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw: get gatlog error"); goto end; } @@ -1564,41 +1597,35 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { goto end; } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - uDebug("tmqWriteRawDataImpl raw data block num:%d", rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { - uError("WriteRaw:no schema, iter:%d", rspObj.resIter); goto end; } const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); if (!tbName) { - uError("WriteRaw: tbname is null"); code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } - uDebug("tmqWriteRawDataImpl raw data tbname:%s", tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbName); code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); - code = TSDB_CODE_SUCCESS; - continue; - } +// if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { +// uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); +// code = TSDB_CODE_SUCCESS; +// continue; +// } if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName); goto end; } SVgroupInfo vg; code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); goto end; } @@ -1620,7 +1647,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:rawBlockBindData failed"); goto end; } taosMemoryFreeClear(pTableMeta); @@ -1628,7 +1654,6 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { - uError("smlBuildOutput failed"); goto end; } @@ -1636,16 +1661,22 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = pRequest->code; end: + uDebug(LOG_ID_TAG " write raw data return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteMqDataRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); + terrno = code; return code; } static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { + if(taos == NULL || data == NULL){ + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } int32_t code = TSDB_CODE_SUCCESS; SHashObj* pVgHash = NULL; SQuery* pQuery = NULL; @@ -1653,15 +1684,13 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; SVCreateTbReq* pCreateReqDst = NULL; - uDebug("tmqWriteRawMetaDataImpl called"); terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); if (!pRequest) { - uError("WriteRaw:createRequest error request is null"); return terrno; } - + uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; rspObj.resIter = -1; rspObj.resType = RES_TYPE__TMQ_METADATA; @@ -1669,13 +1698,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tDecoderInit(&decoder, data, dataLen); code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp); if (code != 0) { - uError("WriteRaw:decode smqDataRsp error"); code = TSDB_CODE_INVALID_MSG; goto end; } if (!pRequest->pDb) { - uError("WriteRaw:not use db"); code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; goto end; } @@ -1683,7 +1710,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) struct SCatalog* pCatalog = NULL; code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw: get gatlog error"); goto end; } @@ -1700,22 +1726,20 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - uDebug("tmqWriteRawMetaDataImpl raw data block num:%d", rspObj.rsp.blockNum); + uDebug(LOG_ID_TAG" write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum); while (++rspObj.resIter < rspObj.rsp.blockNum) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); if (!rspObj.rsp.withSchema) { - uError("WriteRaw:no schema, iter:%d", rspObj.resIter); goto end; } const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); if (!tbName) { - uError("WriteRaw: tbname is null"); code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } - uDebug("tmqWriteRawMetaDataImpl raw data tbname:%s\n", tbName); + uDebug(LOG_ID_TAG" write raw metadata block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; strcpy(pName.dbname, pRequest->pDb); strcpy(pName.tname, tbName); @@ -1731,13 +1755,11 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); - uError("WriteRaw: tDecodeSVCreateTbReq error"); code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } if (pCreateReq.type != TSDB_CHILD_TABLE) { - uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName); code = TSDB_CODE_TSC_INVALID_VALUE; tDecoderClear(&decoderTmp); tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); @@ -1757,7 +1779,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SVgroupInfo vg; code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); goto end; } @@ -1765,13 +1786,12 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) strcpy(pName.tname, pCreateReqDst->ctb.stbName); } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); - code = TSDB_CODE_SUCCESS; - continue; - } +// if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { +// uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); +// code = TSDB_CODE_SUCCESS; +// continue; +// } if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName); goto end; } @@ -1798,7 +1818,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { - uError("WriteRaw:rawBlockBindData failed"); goto end; } pCreateReqDst = NULL; @@ -1807,7 +1826,6 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { - uError("smlBuildOutput failed"); goto end; } @@ -1815,6 +1833,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) code = pRequest->code; end: + uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); tDeleteSTaosxRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); @@ -1825,12 +1844,13 @@ end: tdDestroySVCreateTbReq(pCreateReqDst); taosMemoryFree(pCreateReqDst); } + terrno = code; return code; } char* tmq_get_json_meta(TAOS_RES* res) { if (res == NULL) return NULL; - uDebug("tmq_get_json_meta called"); + uDebug("tmq_get_json_meta res:%p", res); if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res)) { return NULL; } @@ -1865,16 +1885,16 @@ char* tmq_get_json_meta(TAOS_RES* res) { void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { - uDebug("tmq_get_raw called"); if (!raw || !res) { - return TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; raw->raw = pMetaRspObj->metaRsp.metaRsp; raw->raw_len = pMetaRspObj->metaRsp.metaRspLen; raw->raw_type = pMetaRspObj->metaRsp.resMsgType; - uDebug("tmq_get_raw meta"); + uDebug("tmq get raw type meta:%p", raw); } else if (TD_RES_TMQ(res)) { SMqRspObj* rspObj = ((SMqRspObj*)res); @@ -1894,7 +1914,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw = buf; raw->raw_len = len; raw->raw_type = RES_TYPE__TMQ; - uDebug("tmq_get_raw data"); + uDebug("tmq get raw type data:%p", raw); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); @@ -1914,25 +1934,25 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { raw->raw = buf; raw->raw_len = len; raw->raw_type = RES_TYPE__TMQ_METADATA; - uDebug("tmq_get_raw meta data"); + uDebug("tmq get raw type metadata:%p", raw); } else { - uError("tmq_get_raw error:%d", *(int8_t*)res); - return TSDB_CODE_TMQ_INVALID_MSG; + uError("tmq get raw error type:%d", *(int8_t*)res); + terrno = TSDB_CODE_TMQ_INVALID_MSG; + return terrno; } return TSDB_CODE_SUCCESS; } void tmq_free_raw(tmq_raw_data raw) { - uDebug("tmq_free_raw raw_type:%d", raw.raw_type); + uDebug("tmq free raw data type:%d", raw.raw_type); if (raw.raw_type == RES_TYPE__TMQ || raw.raw_type == RES_TYPE__TMQ_METADATA) { taosMemoryFree(raw.raw); } } int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { - uDebug("tmq_write_raw called"); if (!taos) { - return TSDB_CODE_INVALID_PARA; + goto end; } if (raw.raw_type == TDMT_VND_CREATE_STB) { @@ -1954,5 +1974,8 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { } else if (raw.raw_type == RES_TYPE__TMQ_METADATA) { return tmqWriteRawMetaDataImpl(taos, raw.raw, raw.raw_len); } - return TSDB_CODE_INVALID_PARA; + +end: + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c5f077d57f..288919d709 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -307,7 +307,6 @@ typedef struct STagScanInfo { SSDataBlock* pRes; SColMatchInfo matchInfo; int32_t curPos; - SLimitNode* pSlimit; SReadHandle readHandle; STableListInfo* pTableListInfo; uint64_t suid; @@ -318,6 +317,7 @@ typedef struct STagScanInfo { SArray* aUidTags; // SArray SArray* aFilterIdxs; // SArray SStorageAPI* pStorageAPI; + SLimitInfo limitInfo; } STagScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8dbb8a979e..2388e3dac7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3060,7 +3060,12 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); } pRes->info.rows = count; - pOperator->resultInfo.totalRows += count; + + bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo); + if (bLimitReached) { + setOperatorCompleted(pOperator); + } + pOperator->resultInfo.totalRows += pRes->info.rows; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -3094,28 +3099,20 @@ static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) { if (++pInfo->curPos >= size) { setOperatorCompleted(pOperator); } - // each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason. - if (pInfo->pSlimit != NULL) { - if (pInfo->curPos < pInfo->pSlimit->offset) { - continue; - } - pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name)); - if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) { - setOperatorCompleted(pOperator); - } - break; - } } + pRes->info.rows = count; pAPI->metaReaderFn.clearReader(&mr); - + bool bLimitReached = applyLimitOffset(&pInfo->limitInfo, pRes, pTaskInfo); + if (bLimitReached) { + setOperatorCompleted(pOperator); + } // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count); if (pOperator->status == OP_EXEC_DONE) { setTaskStatus(pTaskInfo, TASK_COMPLETED); } - pRes->info.rows = count; - pOperator->resultInfo.totalRows += count; + pOperator->resultInfo.totalRows += pRes->info.rows; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -3169,8 +3166,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; - pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group + initLimitInfo(pPhyNode->node.pLimit, pPhyNode->node.pSlimit, &pInfo->limitInfo); setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); initResultSizeInfo(&pOperator->resultInfo, 4096); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 430e69f46f..8d4c042960 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2730,36 +2730,6 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { return true; } -static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) { - SLogicNode* pNode = pTableScanNode->pParent; - while (NULL != pNode) { - if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || - QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) { - return NULL; - } - if (NULL != pNode->pSlimit) { - return pNode; - } - pNode = pNode->pParent; - } - return NULL; -} - -static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) { - if (NULL != pTableScanNode->pSlimit) { - return; - } - - SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode); - if (NULL != pNode) { - // TODO: only set the slimit now. push down slimit later - pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit); - ((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset; - ((SLimitNode*)pTableScanNode->pSlimit)->offset = 0; - } - return; -} - static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized); if (NULL == pScanNode) { @@ -2795,13 +2765,6 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp pScanNode->node.pTargets = pScanTargets; } - int32_t code = replaceLogicNode(pLogicSubplan, pAgg, (SLogicNode*)pScanNode); - if (TSDB_CODE_SUCCESS == code) { - NODES_CLEAR_LIST(pAgg->pChildren); - } - nodesDestroyNode((SNode*)pAgg); - tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode); - pScanNode->onlyMetaCtbIdx = false; pCxt->optimized = true; diff --git a/tests/develop-test/2-query/tag_scan.py b/tests/develop-test/2-query/tag_scan.py new file mode 100644 index 0000000000..a853e497dd --- /dev/null +++ b/tests/develop-test/2-query/tag_scan.py @@ -0,0 +1,206 @@ +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + +class TDTestCase: + def caseDescription(self): + ''' + case1: [TD-11204]Difference improvement that can ignore negative + ''' + return + + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + self._conn = conn + + def restartTaosd(self, index=1, dbname="db"): + tdDnodes.stop(index) + tdDnodes.startWithoutSleep(index) + tdSql.execute(f"use tagscan") + + + def runSingleVgroup(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists tagscan2") + tdSql.execute("create database if not exists tagscan2 vgroups 1") + tdSql.execute('use tagscan2') + tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);') + + tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);") + + tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);") + + tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);") + + tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);") + + tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);") + + tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);") + + tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.query('select tags t1,t2 from stb1 order by t1,t2;') + tdSql.checkRows(6) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, '1') + tdSql.checkData(1, 0, 2) + tdSql.checkData(1, 1, '2') + tdSql.checkData(2, 0, 3) + tdSql.checkData(2, 1, '3') + tdSql.checkData(3, 0, 4) + tdSql.checkData(3, 1, '4') + tdSql.checkData(4, 0, 5) + tdSql.checkData(4, 1, '5') + tdSql.checkData(5, 0, 5) + tdSql.checkData(5, 1, '5') + + tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;') + tdSql.checkRows(3) + tdSql.checkData(0, 0, 'tb3') + + tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;') + tdSql.checkRows(3) + tdSql.checkData(0, 0, 3) + tdSql.checkData(0, 1, '3') + tdSql.checkData(1, 0, 4) + tdSql.checkData(1, 1, '4') + tdSql.checkData(2, 0, 5) + tdSql.checkData(2, 1, '5') + + tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;') + tdSql.checkRows(3) + + + tdSql.execute('drop database tagscan2') + def runMultiVgroups(self): + print("running {}".format(__file__)) + tdSql.execute("drop database if exists tagscan") + tdSql.execute("create database if not exists tagscan") + tdSql.execute('use tagscan') + tdSql.execute('create table stb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, c6 float, c7 double, c8 binary(10), c9 nchar(10), c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned) TAGS(t1 int, t2 binary(10), t3 double);') + + tdSql.execute("create table tb1 using stb1 tags(1,'1',1.0);") + + tdSql.execute("create table tb2 using stb1 tags(2,'2',2.0);") + + tdSql.execute("create table tb3 using stb1 tags(3,'3',3.0);") + + tdSql.execute("create table tb4 using stb1 tags(4,'4',4.0);") + + tdSql.execute("create table tb5 using stb1 tags(5,'5',5.0);") + + tdSql.execute("create table tb6 using stb1 tags(5,'5',5.0);") + + tdSql.execute('insert into tb1 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb2 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb3 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb4 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb5 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.execute('insert into tb6 values (\'2021-11-11 09:00:00\',true,1,1,1,1,1,1,"123","1234",1,1,1,1);') + + tdSql.query('select tags t1,t2 from stb1 order by t1,t2;') + tdSql.checkRows(6) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, '1') + tdSql.checkData(1, 0, 2) + tdSql.checkData(1, 1, '2') + tdSql.checkData(2, 0, 3) + tdSql.checkData(2, 1, '3') + tdSql.checkData(3, 0, 4) + tdSql.checkData(3, 1, '4') + tdSql.checkData(4, 0, 5) + tdSql.checkData(4, 1, '5') + tdSql.checkData(5, 0, 5) + tdSql.checkData(5, 1, '5') + + tdSql.query('select * from (select tags t1,t2 from stb1 group by t1,t2 slimit 2,3) order by t1,t2;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tags tbname tn from stb1 group by tbname slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tbname tn from stb1 group by tbname slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tbname tn from stb1 group by tbname order by tbname limit 2,3) order by tn;') + tdSql.checkRows(3) + tdSql.checkData(0, 0, 'tb3') + + tdSql.query('select * from (select distinct tbname tn from stb1 limit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select distinct tbname tn, t1,t2 from stb1 limit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tags t1,t2 from stb1 order by t1, t2 limit 2,3) order by t1, t2;') + tdSql.checkRows(3) + tdSql.checkData(0, 0, 3) + tdSql.checkData(0, 1, '3') + tdSql.checkData(1, 0, 4) + tdSql.checkData(1, 1, '4') + tdSql.checkData(2, 0, 5) + tdSql.checkData(2, 1, '5') + + tdSql.query('select * from (select tbname tn, t1,t2 from stb1 partition by tbname slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;') + tdSql.checkRows(3) + + tdSql.query('select * from (select tags tbname tn, t1,t2 from stb1 group by tbname, t1,t2 slimit 2,3) order by tn;') + tdSql.checkRows(3) + + + tdSql.execute('drop database tagscan') + + def run(self): + self.runMultiVgroups() + self.runSingleVgroup() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e83586ca09..85cb8306cb 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1271,6 +1271,7 @@ #develop test ,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py ,,n,develop-test,python3 ./test.py -f 2-query/ts-range.py +,,n,develop-test,python3 ./test.py -f 2-query/tag_scan.py ,,n,develop-test,python3 ./test.py -f 2-query/show_create_db.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/custom_col_tag.py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index ea042829d6..9c45c09715 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -140,6 +140,7 @@ class TDCom: self.range_count = 5 self.default_interval = 5 self.stream_timeout = 12 + self.create_stream_sleep = 0.5 self.record_history_ts = str() self.precision = "ms" self.date_time = self.genTs(precision=self.precision)[0] @@ -881,6 +882,7 @@ class TDCom: stream_options += f" ignore update 0" if not use_except: tdSql.execute(f'create stream if not exists {stream_name} trigger at_once {stream_options} {fill_history} into {des_table} {subtable} as {source_sql} {fill};') + time.sleep(self.create_stream_sleep) return None else: return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table} {subtable} as {source_sql} {fill};' @@ -906,6 +908,7 @@ class TDCom: stream_options += f" ignore update 0" if not use_except: tdSql.execute(f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};') + time.sleep(self.create_stream_sleep) return None else: return f'create stream if not exists {stream_name} {stream_options} {fill_history} into {des_table}{stb_field_name} {tags} {subtable} as {source_sql} {fill};' @@ -1566,8 +1569,8 @@ class TDCom: res1 = tdSql.queryResult tdSql.query(sql2) res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult + tdSql.sql = sql1 new_list = list() - if tag_value_list: res1 = self.float_handle(res1) res2 = self.float_handle(res2) @@ -1602,6 +1605,7 @@ class TDCom: tdSql.query(sql2) # res2 = tdSql.queryResult res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult + tdSql.sql = sql1 if tag_value_list: res1 = self.float_handle(res1) @@ -1643,6 +1647,7 @@ class TDCom: tdSql.query(sql2) # res2 = tdSql.queryResult res2 = self.cast_query_data(tdSql.queryResult) if tag_value_list or use_exist_stb else tdSql.queryResult + tdSql.sql = sql1 if tag_value_list: res1 = self.float_handle(res1) diff --git a/tests/system-test/2-query/limit.py b/tests/system-test/2-query/limit.py index fb5595a8be..961cff5087 100644 --- a/tests/system-test/2-query/limit.py +++ b/tests/system-test/2-query/limit.py @@ -338,11 +338,38 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") + # + def checkVGroups(self): + + # db2 + tdSql.execute("create database db2 vgroups 2;") + tdSql.execute("use db2;") + tdSql.execute("create table st(ts timestamp, age int) tags(area int);") + tdSql.execute("create table t1 using st tags(1);") + tdSql.query("select distinct(tbname) from st limit 1 offset 100;") + tdSql.checkRows(0) + tdLog.info("check db2 vgroups 2 limit 1 offset 100 successfully!") + + + # db1 + tdSql.execute("create database db1 vgroups 1;") + tdSql.execute("use db1;") + tdSql.execute("create table st(ts timestamp, age int) tags(area int);") + tdSql.execute("create table t1 using st tags(1);") + tdSql.query("select distinct(tbname) from st limit 1 offset 100;") + tdSql.checkRows(0) + tdLog.info("check db1 vgroups 1 limit 1 offset 100 successfully!") + + def run(self): # tdSql.prepare() self.prepareTestEnv() self.tmqCase1() + # one vgroup diff more than one vgroup check + self.checkVGroups() + + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed")