diff --git a/cmake/rust-bindings_CMakeLists.txt.in b/cmake/rust-bindings_CMakeLists.txt.in deleted file mode 100644 index d16e86139b..0000000000 --- a/cmake/rust-bindings_CMakeLists.txt.in +++ /dev/null @@ -1,12 +0,0 @@ - -# rust-bindings -ExternalProject_Add(rust-bindings - GIT_REPOSITORY https://github.com/songtianyi/tdengine-rust-bindings.git - GIT_TAG 7ed7a97 - SOURCE_DIR "${TD_SOURCE_DIR}/examples/rust" - BINARY_DIR "${TD_SOURCE_DIR}/examples/rust" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" - ) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 294b59fe95..a1eec81ee0 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -105,11 +105,6 @@ if(${BUILD_WITH_SQLITE}) cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif(${BUILD_WITH_SQLITE}) -# rust-bindings -if(${RUST_BINDINGS}) - cat("${TD_SUPPORT_DIR}/rust-bindings_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif(${RUST_BINDINGS}) - # lucene if(${BUILD_WITH_LUCENE}) cat("${TD_SUPPORT_DIR}/lucene_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) diff --git a/docs/zh/05-get-started/03-package.md b/docs/zh/05-get-started/03-package.md index 9cd2446ba9..c1a67f0182 100644 --- a/docs/zh/05-get-started/03-package.md +++ b/docs/zh/05-get-started/03-package.md @@ -7,12 +7,13 @@ import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; import PkgListV3 from "/components/PkgListV3"; +您可以[用 Docker 立即体验](../../get-started/docker/) TDengine。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. + TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。目前 taosAdapter 仅在 Linux 系统上安装和运行,后续将支持 Windows、macOS 等系统。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../../reference/taosadapter/) 提供 [RESTful 接口](../../reference/rest-api/)。 -为方便使用,标准的服务端安装包包含了 taos、taosd、taosAdapter、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。 - -在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。TDengine 也提供 Windows x64 平台的安装包。您也可以[用 Docker 立即体验](../../get-started/docker/)。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。如果您希望对 TDengine 贡献代码或对内部实现感兴趣,请参考我们的 [TDengine GitHub 主页](https://github.com/taosdata/TDengine) 下载源码构建和安装. +为方便使用,标准的服务端安装包包含了 taosd、taosAdapter、taosc、taos、taosdump、taosBenchmark、TDinsight 安装脚本和示例代码;如果您只需要用到服务端程序和客户端连接的 C/C++ 语言支持,也可以仅下载 lite 版本的安装包。 +在 Linux 系统上,TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包,也支持通过 `apt-get` 工具从线上进行安装。需要注意的是,rpm 和 deb 包不含 taosdump 和 TDinsight 安装脚本,这些工具需要通过安装 taosTool 包获得。TDengine 也提供 Windows x64 平台的安装包。 ## 安装 diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index c487835e2d..25f37121e8 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -807,300 +807,9 @@ SHOW SUBSCRIPTIONS; 以下是各语言的完整示例代码。 + - -```c -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include -#include -#include -#include -#include -#include "taos.h" - -static int running = 1; -static char dbName[64] = "tmqdb"; -static char stbName[64] = "stb"; -static char topicName[64] = "topicname"; - -static int32_t msg_process(TAOS_RES* msg) { - char buf[1024]; - int32_t rows = 0; - - const char* topicName = tmq_get_topic_name(msg); - const char* dbName = tmq_get_db_name(msg); - int32_t vgroupId = tmq_get_vgroup_id(msg); - - printf("topic: %s\n", topicName); - printf("db: %s\n", dbName); - printf("vgroup id: %d\n", vgroupId); - - while (1) { - TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; - - TAOS_FIELD* fields = taos_fetch_fields(msg); - int32_t numOfFields = taos_field_count(msg); - int32_t* length = taos_fetch_lengths(msg); - int32_t precision = taos_result_precision(msg); - const char* tbName = tmq_get_table_name(msg); - rows++; - taos_print_row(buf, row, fields, numOfFields); - printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf); - } - - return rows; -} - -static int32_t init_env() { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - TAOS_RES* pRes; - // drop database if exists - printf("create database\n"); - pRes = taos_query(pConn, "drop database if exists tmqdb"); - if (taos_errno(pRes) != 0) { - printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - // create database - pRes = taos_query(pConn, "create database tmqdb"); - if (taos_errno(pRes) != 0) { - printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - // create super table - printf("create super table\n"); - pRes = taos_query( - pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - // create sub tables - printf("create sub tables\n"); - pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - // insert data - printf("insert data into sub tables\n"); - pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')"); - if (taos_errno(pRes) != 0) { - printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - taos_close(pConn); - return 0; -} - -int32_t create_topic() { - printf("create topic\n"); - TAOS_RES* pRes; - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - return -1; - } - - pRes = taos_query(pConn, "use tmqdb"); - if (taos_errno(pRes) != 0) { - printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"); - if (taos_errno(pRes) != 0) { - printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); - - taos_close(pConn); - return 0; -} - -void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { - printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); -} - -tmq_t* build_consumer() { - tmq_conf_res_t code; - tmq_conf_t* conf = tmq_conf_new(); - code = tmq_conf_set(conf, "enable.auto.commit", "true"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "group.id", "cgrpName"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "client.id", "user defined name"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "td.connect.user", "root"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "experimental.snapshot.enable", "true"); - if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "msg.with.table.name", "true"); - if (TMQ_CONF_OK != code) return NULL; - - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); - - tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); - tmq_conf_destroy(conf); - return tmq; -} - -tmq_list_t* build_topic_list() { - tmq_list_t* topicList = tmq_list_new(); - int32_t code = tmq_list_append(topicList, "topicname"); - if (code) { - return NULL; - } - return topicList; -} - -void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) { - int32_t code; - - if ((code = tmq_subscribe(tmq, topicList))) { - fprintf(stderr, "%% Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); - return; - } - - int32_t totalRows = 0; - int32_t msgCnt = 0; - int32_t timeout = 5000; - while (running) { - TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); - if (tmqmsg) { - msgCnt++; - totalRows += msg_process(tmqmsg); - taos_free_result(tmqmsg); - /*} else {*/ - /*break;*/ - } - } - - fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); -} - -int main(int argc, char* argv[]) { - int32_t code; - - if (init_env() < 0) { - return -1; - } - - if (create_topic() < 0) { - return -1; - } - - tmq_t* tmq = build_consumer(); - if (NULL == tmq) { - fprintf(stderr, "%% build_consumer() fail!\n"); - return -1; - } - - tmq_list_t* topic_list = build_topic_list(); - if (NULL == topic_list) { - return -1; - } - - basic_consume_loop(tmq, topic_list); - - code = tmq_unsubscribe(tmq); - if (code) { - fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code)); - } else { - fprintf(stderr, "%% unsubscribe\n"); - } - - code = tmq_consumer_close(tmq); - if (code) { - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - } else { - fprintf(stderr, "%% Consumer closed\n"); - } - - return 0; -} - -``` - -[查看源码](https://github.com/taosdata/TDengine/blob/develop/examples/c/tmq.c) + @@ -1116,22 +825,7 @@ int main(int argc, char* argv[]) { - -```python -import taos -from taos.tmq import TaosConsumer - -import taos -from taos.tmq import * -consumer = TaosConsumer('topic_ctb_column', group_id='vg2') -for msg in consumer: - for row in msg: - print(row) - -``` - -[查看源码](https://github.com/taosdata/TDengine/blob/develop/docs/examples/python/tmq_example.py) - + diff --git a/docs/zh/07-develop/_sub_c.mdx b/docs/zh/07-develop/_sub_c.mdx index da492a0269..b8f73a8ff1 100644 --- a/docs/zh/07-develop/_sub_c.mdx +++ b/docs/zh/07-develop/_sub_c.mdx @@ -1,3 +1,3 @@ ```c -{{#include docs/examples/c/subscribe_demo.c}} -``` \ No newline at end of file +{{#include docs/examples/c/tmq-example.c}} +``` diff --git a/docs/zh/07-develop/_sub_python.mdx b/docs/zh/07-develop/_sub_python.mdx index 490b76fca6..1309da5b41 100644 --- a/docs/zh/07-develop/_sub_python.mdx +++ b/docs/zh/07-develop/_sub_python.mdx @@ -1,3 +1,3 @@ ```py -{{#include docs/examples/python/subscribe_demo.py}} -``` \ No newline at end of file +{{#include docs/examples/python/tmq_example.py}} +``` diff --git a/examples/rust/.gitignore b/examples/rust/.gitignore new file mode 100644 index 0000000000..96ef6c0b94 --- /dev/null +++ b/examples/rust/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml new file mode 100644 index 0000000000..1ed73e2fde --- /dev/null +++ b/examples/rust/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rust" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +taos = "*" + +[dev-dependencies] +chrono = "0.4" +itertools = "0.10.3" +pretty_env_logger = "0.4.0" +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["full"] } +anyhow = "1" diff --git a/examples/rust/examples/bind-tags.rs b/examples/rust/examples/bind-tags.rs new file mode 100644 index 0000000000..a1f7286625 --- /dev/null +++ b/examples/rust/examples/bind-tags.rs @@ -0,0 +1,80 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test", + "create database test keep 36500", + "use test", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100)) tags(t1 varchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare( + "insert into ? using tb1 tags(?) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )?; + stmt.set_tbname("d0")?; + stmt.set_tags(&[Value::VarChar("涛思".to_string())])?; + + let params = vec![ + ColumnView::from_millis_timestamp(vec![164000000000]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + t1: serde_json::Value, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/bind.rs b/examples/rust/examples/bind.rs new file mode 100644 index 0000000000..194938a319 --- /dev/null +++ b/examples/rust/examples/bind.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use serde::Deserialize; +use taos::*; + +#[tokio::main] +async fn main() -> Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build()?; + taos.exec_many([ + "drop database if exists test_bindable", + "create database test_bindable keep 36500", + "use test_bindable", + "create table tb1 (ts timestamp, c1 bool, c2 tinyint, c3 smallint, c4 int, c5 bigint, + c6 tinyint unsigned, c7 smallint unsigned, c8 int unsigned, c9 bigint unsigned, + c10 float, c11 double, c12 varchar(100), c13 nchar(100))", + ]) + .await?; + let mut stmt = Stmt::init(&taos)?; + stmt.prepare("insert into tb1 values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")?; + let params = vec![ + ColumnView::from_millis_timestamp(vec![0]), + ColumnView::from_bools(vec![true]), + ColumnView::from_tiny_ints(vec![i8::MAX]), + ColumnView::from_small_ints(vec![i16::MAX]), + ColumnView::from_ints(vec![i32::MAX]), + ColumnView::from_big_ints(vec![i64::MAX]), + ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]), + ColumnView::from_unsigned_small_ints(vec![u16::MAX]), + ColumnView::from_unsigned_ints(vec![u32::MAX]), + ColumnView::from_unsigned_big_ints(vec![u64::MAX]), + ColumnView::from_floats(vec![f32::MAX]), + ColumnView::from_doubles(vec![f64::MAX]), + ColumnView::from_varchar(vec!["ABC"]), + ColumnView::from_nchar(vec!["涛思数据"]), + ]; + let rows = stmt.bind(¶ms)?.add_batch()?.execute()?; + assert_eq!(rows, 1); + + #[derive(Debug, Deserialize)] + #[allow(dead_code)] + struct Row { + ts: String, + c1: bool, + c2: i8, + c3: i16, + c4: i32, + c5: i64, + c6: u8, + c7: u16, + c8: u32, + c9: u64, + c10: Option, + c11: f64, + c12: String, + c13: String, + } + + let rows: Vec = taos + .query("select * from tb1") + .await? + .deserialize() + .try_collect() + .await?; + let row = &rows[0]; + dbg!(&row); + assert_eq!(row.c5, i64::MAX); + assert_eq!(row.c8, u32::MAX); + assert_eq!(row.c9, u64::MAX); + assert_eq!(row.c10.unwrap(), f32::MAX); + // assert_eq!(row.c11, f64::MAX); + assert_eq!(row.c12, "ABC"); + assert_eq!(row.c13, "涛思数据"); + + Ok(()) +} diff --git a/examples/rust/examples/query.rs b/examples/rust/examples/query.rs new file mode 100644 index 0000000000..016b291abc --- /dev/null +++ b/examples/rust/examples/query.rs @@ -0,0 +1,106 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://"; + + let opts = PoolBuilder::new() + .max_size(5000) // max connections + .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection + .min_idle(Some(1000)) // minimal idle connections + .connection_timeout(Duration::from_secs(2)); + + let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?; + + let taos = pool.get()?; + + let db = "query"; + + // prepare database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + let inserted = taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))", + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + + assert_eq!(inserted, 6); + loop { + let count: usize = taos + .query_one("select count(*) from `meters`") + .await? + .unwrap_or_default(); + + if count >= 6 { + break; + } else { + println!("waiting for data"); + } + } + + let mut result = taos.query("select tbname, * from `meters`").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); + } + + // Query option 1, use rows stream. + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + + // Query options 2, use deserialization with serde. + #[derive(Debug, serde::Deserialize)] + #[allow(dead_code)] + struct Record { + tbname: String, + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, + groupid: i32, + // binary/varchar to String + location: String, + } + + let records: Vec = taos + .query("select tbname, * from `meters`") + .await? + .deserialize() + .try_collect() + .await?; + + dbg!(result.summary()); + assert_eq!(records.len(), 6); + dbg!(records); + Ok(()) +} diff --git a/examples/rust/examples/subscribe.rs b/examples/rust/examples/subscribe.rs new file mode 100644 index 0000000000..9e2e890405 --- /dev/null +++ b/examples/rust/examples/subscribe.rs @@ -0,0 +1,103 @@ +use std::time::Duration; + +use chrono::{DateTime, Local}; +use taos::*; + +// Query options 2, use deserialization with serde. +#[derive(Debug, serde::Deserialize)] +#[allow(dead_code)] +struct Record { + // deserialize timestamp to chrono::DateTime + ts: DateTime, + // float to f32 + current: Option, + // int to i32 + voltage: Option, + phase: Option, +} + +async fn prepare(taos: Taos) -> anyhow::Result<()> { + let inserted = taos.exec_many([ + // create child table + "CREATE TABLE `d0` USING `meters` TAGS(0, 'Los Angles')", + // insert into child table + "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)", + // insert with NULL values + "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)", + // insert and automatically create table with tags if not exists + "INSERT INTO `d1` USING `meters` TAGS(1, 'San Francisco') values(now - 9s, 10.1, 119, 0.33)", + // insert many records in a single sql + "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)", + ]).await?; + assert_eq!(inserted, 6); + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // std::env::set_var("RUST_LOG", "debug"); + pretty_env_logger::init(); + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + let db = "tmq"; + + // prepare database + taos.exec_many([ + "DROP TOPIC IF EXISTS tmq_meters".to_string(), + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(16))".to_string(), + // create topic for subscription + format!("CREATE TOPIC tmq_meters with META AS DATABASE {db}") + ]) + .await?; + + let task = tokio::spawn(prepare(taos)); + + tokio::time::sleep(Duration::from_secs(1)).await; + + // subscribe + let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?; + + let mut consumer = tmq.build()?; + consumer.subscribe(["tmq_meters"]).await?; + + { + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } + } + + consumer.unsubscribe().await; + + task.await??; + + Ok(()) +} diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs new file mode 100644 index 0000000000..e7a11a969c --- /dev/null +++ b/examples/rust/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cc15d4ed6b..fa6f4b6c79 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2555,10 +2555,14 @@ typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; int64_t ntbUid; SArray* colIdList; // SArray -} SCheckAlterInfo; +} STqCheckInfo; -int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo); -int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo); +int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo); +int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo); + +typedef struct { + char topic[TSDB_TOPIC_FNAME_LEN]; +} STqDelCheckInfoReq; typedef struct { int32_t vgId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 6462c7afbf..b16df0e885 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -188,7 +188,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) - TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ADD_CHECK_INFO, "vnode-add-check-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE_CHECK_INFO, "vnode-delete-check-info", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eac92d76ba..e6fcb021d5 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -515,7 +515,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7dd3ce34c3..533d924546 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4262,7 +4262,6 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp tDecoderClear(&decoder); return 0; } - int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; @@ -4300,7 +4299,6 @@ int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pRe tEndDecode(decoder); return 0; } - int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5590,7 +5588,6 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { return 0; } -#if 1 int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { if (pVal->type == TMQ_OFFSET__RESET_NONE) { snprintf(buf, maxLen, "offset(reset to none)"); @@ -5609,7 +5606,6 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) { } return 0; } -#endif bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) { if (pLeft->type == pRight->type) { @@ -5643,7 +5639,7 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { return 0; } -int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) { +int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) { if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1; if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1; int32_t sz = taosArrayGetSize(pInfo->colIdList); @@ -5655,7 +5651,7 @@ int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) return pEncoder->pos; } -int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) { +int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) { if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1; if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1; int32_t sz; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 647af20fcf..ec761e6441 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -225,7 +225,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 7c6807ab87..8eb3ed3901 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -361,7 +361,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ADD_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE_CHECK_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 9f6108004d..037a46345f 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -15,10 +15,10 @@ #define _DEFAULT_SOURCE #include "mndOffset.h" -#include "mndPrivilege.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" +#include "mndPrivilege.h" #include "mndShow.h" #include "mndStb.h" #include "mndTopic.h" @@ -305,7 +305,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { sdbRelease(pSdb, pOffset); } - return code; + return code; } int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 3f310ee9c0..10e520d9ec 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -356,31 +356,44 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } + ASSERT(pIter == NULL); // 7. handle unassigned vg if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { // if has consumer, assign all left vg while (1) { + SMqConsumerEp *pConsumerEp = NULL; pRemovedIter = taosHashIterate(pHash, pRemovedIter); - if (pRemovedIter == NULL) break; - pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); - ASSERT(pIter); + if (pRemovedIter == NULL) { + if (pIter != NULL) { + taosHashCancelIterate(pOutput->pSub->consumerHash, pIter); + pIter = NULL; + } + break; + } + while (1) { + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + ASSERT(pIter); + pConsumerEp = (SMqConsumerEp *)pIter; + ASSERT(pConsumerEp->consumerId > 0); + if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { + break; + } + } pRebVg = (SMqRebOutputVg *)pRemovedIter; - SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; - ASSERT(pConsumerEp->consumerId > 0); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); pRebVg->newConsumerId = pConsumerEp->consumerId; if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { - mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); continue; } taosArrayPush(pOutput->rebVgs, pRebVg); - mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, + mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId); } } else { @@ -571,7 +584,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); /*ASSERT(pTopic);*/ if (pTopic == NULL) { - mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic); + mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic); continue; } taosRLockLatch(&pTopic->lock); @@ -601,7 +614,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { // TODO replace assert with error check if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { - mError("persist rebalance output error, possibly vnode splitted or dropped"); + mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped"); } taosArrayDestroy(pRebInfo->lostConsumers); taosArrayDestroy(pRebInfo->newConsumers); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 820bb4b636..ff208eae60 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -57,7 +57,8 @@ int32_t mndInitTopic(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_ADD_CHECK_INFO_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DELETE_CHECK_INFO_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); @@ -450,7 +451,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); if (topicObj.ntbUid != 0) { - SCheckAlterInfo info; + STqCheckInfo info; memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN); info.ntbUid = topicObj.ntbUid; info.colIdList = topicObj.ntbColIds; @@ -470,7 +471,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * // encoder check alter info int32_t len; int32_t code; - tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code); + tEncodeSize(tEncodeSTqCheckInfo, &info, len, code); if (code < 0) { sdbRelease(pSdb, pVgroup); mndTransDrop(pTrans); @@ -481,7 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; tEncoderInit(&encoder, abuf, len); - if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) { + if (tEncodeSTqCheckInfo(&encoder, &info) < 0) { sdbRelease(pSdb, pVgroup); mndTransDrop(pTrans); return -1; @@ -493,7 +494,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = buf; action.contLen = sizeof(SMsgHead) + len; - action.msgType = TDMT_VND_CHECK_ALTER_INFO; + action.msgType = TDMT_VND_ADD_CHECK_INFO; if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); sdbRelease(pSdb, pVgroup); @@ -659,12 +660,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); +#if 0 if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { ASSERT(0); mndTransDrop(pTrans); mndReleaseTopic(pMnode, pTopic); return -1; } +#endif // TODO check if rebalancing if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { @@ -675,6 +678,37 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } + if (pTopic->ntbUid != 0) { + // broadcast to all vnode + void *pIter = NULL; + SVgObj *pVgroup = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (!mndVgroupInDb(pVgroup, pTopic->dbUid)) { + sdbRelease(pSdb, pVgroup); + continue; + } + + void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + ((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId); + memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN; + action.msgType = TDMT_VND_DELETE_CHECK_INFO; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + sdbRelease(pSdb, pVgroup); + mndTransDrop(pTrans); + return -1; + } + } + } + int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic); mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index b218d982e9..a3e17f5377 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -24,6 +24,7 @@ target_sources( "src/meta/metaCommit.c" "src/meta/metaEntry.c" "src/meta/metaSnapshot.c" + "src/meta/metaCache.c" # sma "src/sma/smaEnv.c" diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index a72546fe86..2efc33a8ee 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -23,8 +23,9 @@ extern "C" { #endif -typedef struct SMetaIdx SMetaIdx; -typedef struct SMetaDB SMetaDB; +typedef struct SMetaIdx SMetaIdx; +typedef struct SMetaDB SMetaDB; +typedef struct SMetaCache SMetaCache; // metaDebug ================== // clang-format off @@ -60,6 +61,13 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64() // metaTable ================== int metaHandleEntry(SMeta* pMeta, const SMetaEntry* pME); +// metaCache ================== +int32_t metaCacheOpen(SMeta* pMeta); +void metaCacheClose(SMeta* pMeta); +int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo); +int32_t metaCacheDrop(SMeta* pMeta, int64_t uid); +int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo); + struct SMeta { TdThreadRwlock lock; @@ -84,6 +92,8 @@ struct SMeta { TTB* pStreamDb; SMetaIdx* pIdx; + + SMetaCache* pCache; }; typedef struct { @@ -92,6 +102,12 @@ typedef struct { } STbDbKey; #pragma pack(push, 1) +typedef struct { + tb_uid_t suid; + int64_t version; + int32_t skmVer; +} SUidIdxVal; + typedef struct { tb_uid_t uid; int32_t sver; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a1dba41c94..cb5ec7aabe 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -117,16 +117,15 @@ typedef struct { struct STQ { SVnode* pVnode; char* path; - SHashObj* pushMgr; // consumerId -> STqHandle* - SHashObj* handles; // subKey -> STqHandle - SHashObj* pAlterInfo; // topic -> SAlterCheckInfo + SHashObj* pPushMgr; // consumerId -> STqHandle* + SHashObj* pHandle; // subKey -> STqHandle + SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; - TDB* pMetaStore; + TDB* pMetaDB; TTB* pExecStore; - - TTB* pAlterInfoStore; + TTB* pCheckStore; SStreamMeta* pStreamMeta; }; @@ -155,6 +154,9 @@ int32_t tqMetaClose(STQ* pTq); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaRestoreHandle(STQ* pTq); +int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); +int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); +int32_t tqMetaRestoreCheckInfo(STQ* pTq); typedef struct { int32_t size; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 35c26eac44..0c7a08a2b5 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -130,6 +130,14 @@ int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); +typedef struct SMetaInfo { + int64_t uid; + int64_t suid; + int64_t version; + int32_t skmVer; +} SMetaInfo; +int32_t metaGetInfo(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo); + // tsdb int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); int tsdbClose(STsdb** pTsdb); @@ -155,13 +163,16 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); -int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver); +// tq-mq +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); +// tq-stream +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c new file mode 100644 index 0000000000..b8cc9f0df2 --- /dev/null +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "meta.h" + +#define META_CACHE_BASE_BUCKET 1024 + +// (uid , suid) : child table +// (uid, 0) : normal table +// (suid, suid) : super table +typedef struct SMetaCacheEntry SMetaCacheEntry; +struct SMetaCacheEntry { + SMetaCacheEntry* next; + SMetaInfo info; +}; + +struct SMetaCache { + int32_t nEntry; + int32_t nBucket; + SMetaCacheEntry** aBucket; +}; + +int32_t metaCacheOpen(SMeta* pMeta) { + int32_t code = 0; + SMetaCache* pCache = NULL; + + pCache = (SMetaCache*)taosMemoryMalloc(sizeof(SMetaCache)); + if (pCache == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + pCache->nEntry = 0; + pCache->nBucket = META_CACHE_BASE_BUCKET; + pCache->aBucket = (SMetaCacheEntry**)taosMemoryCalloc(pCache->nBucket, sizeof(SMetaCacheEntry*)); + if (pCache->aBucket == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pCache); + goto _err; + } + + pMeta->pCache = pCache; + +_exit: + return code; + +_err: + metaError("vgId:%d meta open cache failed since %s", TD_VID(pMeta->pVnode), tstrerror(code)); + return code; +} + +void metaCacheClose(SMeta* pMeta) { + if (pMeta->pCache) { + for (int32_t iBucket = 0; iBucket < pMeta->pCache->nBucket; iBucket++) { + SMetaCacheEntry* pEntry = pMeta->pCache->aBucket[iBucket]; + while (pEntry) { + SMetaCacheEntry* tEntry = pEntry->next; + taosMemoryFree(pEntry); + pEntry = tEntry; + } + } + taosMemoryFree(pMeta->pCache->aBucket); + taosMemoryFree(pMeta->pCache); + pMeta->pCache = NULL; + } +} + +static int32_t metaRehashCache(SMetaCache* pCache, int8_t expand) { + int32_t code = 0; + int32_t nBucket; + + if (expand) { + nBucket = pCache->nBucket * 2; + } else { + nBucket = pCache->nBucket / 2; + } + + SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*)); + if (aBucket == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // rehash + for (int32_t iBucket = 0; iBucket < pCache->nBucket; iBucket++) { + SMetaCacheEntry* pEntry = pCache->aBucket[iBucket]; + + while (pEntry) { + SMetaCacheEntry* pTEntry = pEntry->next; + + pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket]; + aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry; + + pEntry = pTEntry; + } + } + + // final set + taosMemoryFree(pCache->aBucket); + pCache->nBucket = nBucket; + pCache->aBucket = aBucket; + +_exit: + return code; +} + +int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) { + int32_t code = 0; + + // ASSERT(metaIsWLocked(pMeta)); + + // search + SMetaCache* pCache = pMeta->pCache; + int32_t iBucket = TABS(pInfo->uid) % pCache->nBucket; + SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket]; + while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) { + ppEntry = &(*ppEntry)->next; + } + + if (*ppEntry) { // update + ASSERT(pInfo->suid == (*ppEntry)->info.suid); + if (pInfo->version > (*ppEntry)->info.version) { + (*ppEntry)->info.version = pInfo->version; + (*ppEntry)->info.skmVer = pInfo->skmVer; + } + } else { // insert + if (pCache->nEntry >= pCache->nBucket) { + code = metaRehashCache(pCache, 1); + if (code) goto _exit; + + iBucket = TABS(pInfo->uid) % pCache->nBucket; + } + + SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew)); + if (pEntryNew == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pEntryNew->info = *pInfo; + pEntryNew->next = pCache->aBucket[iBucket]; + pCache->aBucket[iBucket] = pEntryNew; + pCache->nEntry++; + } + +_exit: + return code; +} + +int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) { + int32_t code = 0; + + SMetaCache* pCache = pMeta->pCache; + int32_t iBucket = TABS(uid) % pCache->nBucket; + SMetaCacheEntry** ppEntry = &pCache->aBucket[iBucket]; + while (*ppEntry && (*ppEntry)->info.uid != uid) { + ppEntry = &(*ppEntry)->next; + } + + SMetaCacheEntry* pEntry = *ppEntry; + if (pEntry) { + *ppEntry = pEntry->next; + taosMemoryFree(pEntry); + pCache->nEntry--; + if (pCache->nEntry < pCache->nBucket / 4 && pCache->nBucket > META_CACHE_BASE_BUCKET) { + code = metaRehashCache(pCache, 0); + if (code) goto _exit; + } + } else { + code = TSDB_CODE_NOT_FOUND; + } + +_exit: + return code; +} + +int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) { + int32_t code = 0; + + SMetaCache* pCache = pMeta->pCache; + int32_t iBucket = TABS(uid) % pCache->nBucket; + SMetaCacheEntry* pEntry = pCache->aBucket[iBucket]; + + while (pEntry && pEntry->info.uid != uid) { + pEntry = pEntry->next; + } + + if (pEntry) { + *pInfo = pEntry->info; + } else { + code = TSDB_CODE_NOT_FOUND; + } + + return code; +} diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 941d2c6d72..cf17459bc2 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -73,7 +73,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { } // open pUidIdx - ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); + ret = tdbTbOpen("uid.idx", sizeof(tb_uid_t), sizeof(SUidIdxVal), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); if (ret < 0) { metaError("vgId:%d, failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; @@ -143,6 +143,13 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { goto _err; } + int32_t code = metaCacheOpen(pMeta); + if (code) { + terrno = code; + metaError("vgId:%d, failed to open meta cache since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + metaDebug("vgId:%d, meta is opened", TD_VID(pVnode)); *ppMeta = pMeta; @@ -169,6 +176,7 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { + if (pMeta->pCache) metaCacheClose(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index eed0ae5e14..6ff55d2f4e 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -63,7 +63,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { return -1; } - version = *(int64_t *)pReader->pBuf; + version = ((SUidIdxVal *)pReader->pBuf)[0].version; return metaGetTableEntryByVersion(pReader, version, uid); } @@ -160,7 +160,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) { tDecoderClear(&pTbCur->mr.coder); - metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey); + metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey); if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) { continue; } @@ -185,7 +185,7 @@ _query: goto _err; } - version = *(int64_t *)pData; + version = ((SUidIdxVal *)pData)[0].version; tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData); SMetaEntry me = {0}; @@ -888,3 +888,41 @@ END: return ret; } + +int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) { + int32_t code = 0; + void *pData = NULL; + int nData = 0; + + metaRLock(pMeta); + + // search cache + if (metaCacheGet(pMeta, uid, pInfo) == 0) { + metaULock(pMeta); + goto _exit; + } + + // search TDB + if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) { + // not found + metaULock(pMeta); + code = TSDB_CODE_NOT_FOUND; + goto _exit; + } + + metaULock(pMeta); + + pInfo->uid = uid; + pInfo->suid = ((SUidIdxVal *)pData)->suid; + pInfo->version = ((SUidIdxVal *)pData)->version; + pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer; + + // upsert the cache + metaWLock(pMeta); + metaCacheUpsert(pMeta, pInfo); + metaULock(pMeta); + +_exit: + tdbFree(pData); + return code; +} diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index 1e5b699fce..3ada7d1814 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -28,9 +28,9 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { int vLen = 0; const void *pKey = NULL; const void *pVal = NULL; - void * pBuf = NULL; + void *pBuf = NULL; int32_t szBuf = 0; - void * p = NULL; + void *p = NULL; SMetaReader mr = {0}; // validate req @@ -83,8 +83,8 @@ int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) { static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) { STbDbKey tbDbKey; - void * pKey = NULL; - void * pVal = NULL; + void *pKey = NULL; + void *pVal = NULL; int kLen = 0; int vLen = 0; SEncoder coder = {0}; @@ -130,7 +130,8 @@ _err: } static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); + SUidIdxVal uidIdxVal = {.suid = pME->smaEntry.tsma->indexUid, .version = pME->version, .skmVer = 0}; + return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn); } static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index a1c1215a92..e8755e0035 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -27,6 +27,23 @@ static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry); static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type); +static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) { + pInfo->uid = pEntry->uid; + pInfo->version = pEntry->version; + if (pEntry->type == TSDB_SUPER_TABLE) { + pInfo->suid = pEntry->uid; + pInfo->skmVer = pEntry->stbEntry.schemaRow.version; + } else if (pEntry->type == TSDB_CHILD_TABLE) { + pInfo->suid = pEntry->ctbEntry.suid; + pInfo->skmVer = 0; + } else if (pEntry->type == TSDB_NORMAL_TABLE) { + pInfo->suid = 0; + pInfo->skmVer = pEntry->ntbEntry.schemaRow.version; + } else { + ASSERT(0); + } +} + static int metaUpdateMetaRsp(tb_uid_t uid, char *tbName, SSchemaWrapper *pSchema, STableMetaRsp *pMetaRsp) { pMetaRsp->pSchemas = taosMemoryMalloc(pSchema->nCols * sizeof(SSchema)); if (NULL == pMetaRsp->pSchemas) { @@ -171,32 +188,22 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { void *pBuf = NULL; int32_t szBuf = 0; void *p = NULL; - SMetaReader mr = {0}; // validate req - metaReaderInit(&mr, pMeta, 0); - if (metaGetTableEntryByName(&mr, pReq->name) == 0) { - if (mr.me.type == TSDB_SUPER_TABLE) { - metaReaderClear(&mr); + void *pData = NULL; + int nData = 0; + if (tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name), &pData, &nData) == 0) { + tb_uid_t uid = *(tb_uid_t *)pData; + tdbFree(pData); + SMetaInfo info; + metaGetInfo(pMeta, uid, &info); + if (info.uid == info.suid) { return 0; } else { terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; - metaReaderClear(&mr); return -1; } - /* -// TODO: just for pass case -#if 0 - terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; - metaReaderClear(&mr); - return -1; -#else - metaReaderClear(&mr); - return 0; -#endif - */ } - metaReaderClear(&mr); // set structs me.version = version; @@ -275,8 +282,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb // drop super table _drop_super_table: tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData); - tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey), - &pMeta->txn); + tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = ((SUidIdxVal *)pData)[0].version, .uid = pReq->suid}, + sizeof(STbDbKey), &pMeta->txn); tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn); tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn); tdbTbDelete(pMeta->pSuidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn); @@ -319,7 +326,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { return -1; } - oversion = *(int64_t *)pData; + oversion = ((SUidIdxVal *)pData)[0].version; tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); ret = tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = pReq->suid, .version = oversion}), sizeof(STbDbKey), &c); @@ -346,15 +353,14 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { metaSaveToSkmDb(pMeta, &nStbEntry); } - // if (oStbEntry.stbEntry.schemaTag.sver != pReq->schemaTag.sver) { - // // change tag schema - // } - // update table.db metaSaveToTbDb(pMeta, &nStbEntry); // update uid index - tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &version, sizeof(version), 0); + SMetaInfo info; + metaGetEntryInfo(&nStbEntry, &info); + tdbTbcUpsert(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), + &(SUidIdxVal){.suid = info.suid, .version = info.version, .skmVer = info.skmVer}, sizeof(SUidIdxVal), 0); if (oStbEntry.pBuf) taosMemoryFree(oStbEntry.pBuf); metaULock(pMeta); @@ -513,7 +519,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { SDecoder dc = {0}; rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData); - int64_t version = *(int64_t *)pData; + int64_t version = ((SUidIdxVal *)pData)[0].version; tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData); @@ -527,7 +533,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { int tLen = 0; if (tdbTbGet(pMeta->pUidIdx, &e.ctbEntry.suid, sizeof(tb_uid_t), &tData, &tLen) == 0) { - version = *(int64_t *)tData; + version = ((SUidIdxVal *)tData)[0].version; STbDbKey tbDbKey = {.uid = e.ctbEntry.suid, .version = version}; if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &tData, &tLen) == 0) { SDecoder tdc = {0}; @@ -566,6 +572,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { --pMeta->pVnode->config.vndStats.numOfSTables; } + metaCacheDrop(pMeta, uid); + tDecoderClear(&dc); tdbFree(pData); @@ -604,7 +612,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ASSERT(c == 0); tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); - oversion = *(int64_t *)pData; + oversion = ((SUidIdxVal *)pData)[0].version; // search table.db TBC *pTbDbc = NULL; @@ -718,7 +726,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl // save to table db metaSaveToTbDb(pMeta, &entry); - tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0); + metaUpdateUidIdx(pMeta, &entry); metaSaveToSkmDb(pMeta, &entry); @@ -774,7 +782,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ASSERT(c == 0); tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); - oversion = *(int64_t *)pData; + oversion = ((SUidIdxVal *)pData)[0].version; // search table.db TBC *pTbDbc = NULL; @@ -794,8 +802,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA /* get stbEntry*/ tdbTbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal); - tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey), - (void **)&stbEntry.pBuf, &nVal); + tdbTbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = ((SUidIdxVal *)pVal)[0].version}), + sizeof(STbDbKey), (void **)&stbEntry.pBuf, &nVal); tdbFree(pVal); tDecoderInit(&dc2, stbEntry.pBuf, nVal); metaDecodeEntry(&dc2, &stbEntry); @@ -869,7 +877,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA metaSaveToTbDb(pMeta, &ctbEntry); // save to uid.idx - tdbTbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn); + metaUpdateUidIdx(pMeta, &ctbEntry); if (iCol == 0) { metaUpdateTagIdx(pMeta, &ctbEntry); @@ -924,7 +932,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p ASSERT(c == 0); tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); - oversion = *(int64_t *)pData; + oversion = ((SUidIdxVal *)pData)[0].version; // search table.db TBC *pTbDbc = NULL; @@ -969,7 +977,7 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p // save to table db metaSaveToTbDb(pMeta, &entry); - tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0); + metaUpdateUidIdx(pMeta, &entry); metaULock(pMeta); tdbTbcClose(pTbDbc); @@ -1052,7 +1060,14 @@ _err: } static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); + // upsert cache + SMetaInfo info; + metaGetEntryInfo(pME, &info); + metaCacheUpsert(pMeta, &info); + + SUidIdxVal uidIdxVal = {.suid = info.suid, .version = info.version, .skmVer = info.skmVer}; + + return tdbTbUpsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), &pMeta->txn); } static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME) { @@ -1128,7 +1143,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { return -1; } tbDbKey.uid = pCtbEntry->ctbEntry.suid; - tbDbKey.version = *(int64_t *)pData; + tbDbKey.version = ((SUidIdxVal *)pData)[0].version; tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData); tDecoderInit(&dc, pData, nData); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 112543e340..a98fea1988 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -60,11 +60,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { pTq->path = strdup(path); pTq->pVnode = pVnode; - pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); - pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (tqMetaOpen(pTq) < 0) { ASSERT(0); @@ -85,9 +85,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { void tqClose(STQ* pTq) { if (pTq) { tqOffsetClose(pTq->pOffsetStore); - taosHashCleanup(pTq->handles); - taosHashCleanup(pTq->pushMgr); - taosHashCleanup(pTq->pAlterInfo); + taosHashCleanup(pTq->pHandle); + taosHashCleanup(pTq->pPushMgr); + taosHashCleanup(pTq->pCheckInfo); taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); @@ -183,7 +183,12 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) { +static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { + return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && + pLeft->val.version <= pRight->val.version; +} + +int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { STqOffset offset = {0}; SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); @@ -199,19 +204,24 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve } else if (offset.val.type == TMQ_OFFSET__LOG) { tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey, TD_VID(pTq->pVnode), offset.val.version); + if (offset.val.version + 1 == version) { + offset.val.version += 1; + } } else { ASSERT(0); } - /*STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);*/ - /*if (pOffset != NULL) {*/ - /*if (pOffset->val.type == TMQ_OFFSET__LOG && pOffset->val.version < offset.val.version) {*/ + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); + if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) { + return 0; + } + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { ASSERT(0); return -1; } if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { if (walRefVer(pHandle->pRef, offset.val.version) < 0) { ASSERT(0); @@ -220,6 +230,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve } } + // rsp + /*}*/ /*}*/ @@ -229,15 +241,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ve int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pTq->pAlterInfo, pIter); + pIter = taosHashIterate(pTq->pCheckInfo, pIter); if (pIter == NULL) break; - SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter; + STqCheckInfo* pCheck = (STqCheckInfo*)pIter; if (pCheck->ntbUid == tbUid) { int32_t sz = taosArrayGetSize(pCheck->colIdList); for (int32_t i = 0; i < sz; i++) { int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i); if (forbidColId == colId) { - taosHashCancelIterate(pTq->pAlterInfo, pIter); + taosHashCancelIterate(pTq->pCheckInfo, pIter); return -1; } } @@ -289,7 +301,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SWalCkHead* pCkHead = NULL; // 1.find handle - STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ if (pHandle == NULL) { tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId, @@ -478,10 +490,10 @@ OVER: return code; } -int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessVgDeleteReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t code = taosHashRemove(pTq->handles, pReq->subKey, strlen(pReq->subKey)); + int32_t code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); ASSERT(code == 0); tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); @@ -492,27 +504,43 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) { - SCheckAlterInfo info = {0}; - SDecoder decoder; +int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + STqCheckInfo info = {0}; + SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); - if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) { + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } tDecoderClear(&decoder); - if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) { + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } return 0; } -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { + if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (tqMetaDeleteCheckInfo(pTq, msg) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock - STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, @@ -579,7 +607,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); taosArrayDestroy(tbUidList); } - taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); + taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer %" PRId64, req.subKey, pHandle->consumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { // TODO @@ -668,34 +696,9 @@ FAIL: return code; } -int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { // - return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen); -#if 0 - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return -1; - } - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); - goto FAIL; - } - tDecoderClear(&decoder); - - if (tqExpandTask(pTq, pTask) < 0) { - goto FAIL; - } - - taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); - - return 0; - -FAIL: - if (pTask) taosMemoryFree(pTask); - return -1; -#endif + return streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen); } int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { @@ -817,7 +820,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } } -int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 5709ad7c85..405bc669bd 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -43,6 +43,185 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { return 0; } +int32_t tqMetaOpen(STQ* pTq) { + if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB) < 0) { + ASSERT(0); + return -1; + } + + if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore) < 0) { + ASSERT(0); + return -1; + } + + if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore) < 0) { + ASSERT(0); + return -1; + } + + if (tqMetaRestoreHandle(pTq) < 0) { + return -1; + } + + if (tqMetaRestoreCheckInfo(pTq) < 0) { + return -1; + } + + return 0; +} + +int32_t tqMetaClose(STQ* pTq) { + if (pTq->pExecStore) { + tdbTbClose(pTq->pExecStore); + } + if (pTq->pCheckStore) { + tdbTbClose(pTq->pCheckStore); + } + tdbClose(pTq->pMetaDB); + return 0; +} + +int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) { + TXN txn; + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + return -1; + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + return -1; + } + + if (tdbTbUpsert(pTq->pExecStore, key, strlen(key), value, vLen, &txn) < 0) { + return -1; + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + return -1; + } + + return 0; +} + +int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) { + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), &txn) < 0) { + /*ASSERT(0);*/ + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + return 0; +} + +int32_t tqMetaRestoreCheckInfo(STQ* pTq) { + TBC* pCur = NULL; + if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) { + ASSERT(0); + return -1; + } + + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; + SDecoder decoder; + + tdbTbcMoveToFirst(pCur); + + while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + STqCheckInfo info; + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); + if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + tDecoderClear(&decoder); + if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + tdbTbcClose(pCur); + return 0; +} + +int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { + int32_t code; + int32_t vlen; + tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); + ASSERT(code == 0); + + tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId, + TD_VID(pTq->pVnode)); + + void* buf = taosMemoryCalloc(1, vlen); + if (buf == NULL) { + ASSERT(0); + } + + SEncoder encoder; + tEncoderInit(&encoder, buf, vlen); + + if (tEncodeSTqHandle(&encoder, pHandle) < 0) { + ASSERT(0); + } + + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { + ASSERT(0); + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + tEncoderClear(&encoder); + taosMemoryFree(buf); + return 0; +} + +int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { + TXN txn; + + if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + ASSERT(0); + } + + if (tdbBegin(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) { + /*ASSERT(0);*/ + } + + if (tdbCommit(pTq->pMetaDB, &txn) < 0) { + ASSERT(0); + } + + return 0; +} + int32_t tqMetaRestoreHandle(STQ* pTq) { TBC* pCur = NULL; if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) { @@ -93,101 +272,10 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); - taosHashPut(pTq->handles, pKey, kLen, &handle, sizeof(STqHandle)); + taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } tdbTbcClose(pCur); return 0; } -int32_t tqMetaOpen(STQ* pTq) { - if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaStore) < 0) { - ASSERT(0); - return -1; - } - - if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { - ASSERT(0); - return -1; - } - - if (tqMetaRestoreHandle(pTq) < 0) { - return -1; - } - - return 0; -} - -int32_t tqMetaClose(STQ* pTq) { - if (pTq->pExecStore) { - tdbTbClose(pTq->pExecStore); - } - tdbClose(pTq->pMetaStore); - return 0; -} - -int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) { - int32_t code; - int32_t vlen; - tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code); - ASSERT(code == 0); - - tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, strlen(pHandle->subKey), pHandle->consumerId, - TD_VID(pTq->pVnode)); - - void* buf = taosMemoryCalloc(1, vlen); - if (buf == NULL) { - ASSERT(0); - } - - SEncoder encoder; - tEncoderInit(&encoder, buf, vlen); - - if (tEncodeSTqHandle(&encoder, pHandle) < 0) { - ASSERT(0); - } - - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); - } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, &txn) < 0) { - ASSERT(0); - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - tEncoderClear(&encoder); - taosMemoryFree(buf); - return 0; -} - -int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) { - TXN txn; - - if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - ASSERT(0); - } - - if (tdbBegin(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), &txn) < 0) { - /*ASSERT(0);*/ - } - - if (tdbCommit(pTq->pMetaStore, &txn) < 0) { - ASSERT(0); - } - - return 0; -} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5d7814a045..b8803b20fc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -394,7 +394,7 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { void* pIter = NULL; while (1) { - pIter = taosHashIterate(pTq->handles, pIter); + pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index b4a7ce7737..c52e0e2c09 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -165,9 +165,9 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { STQ* pTq = pWriter->pTq; if (rollback) { - ASSERT(0); + tdbAbort(pWriter->pTq->pMetaDB, &pWriter->txn); } else { - code = tdbCommit(pWriter->pTq->pMetaStore, &pWriter->txn); + code = tdbCommit(pWriter->pTq->pMetaDB, &pWriter->txn); if (code) goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 6fc6636623..34b37ffe9b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -108,29 +108,21 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI STbData *pTbData = NULL; tb_uid_t suid = pMsgIter->suid; tb_uid_t uid = pMsgIter->uid; - int32_t sverNew; - // check if table exists (todo: refact) - SMetaReader mr = {0}; - // SMetaEntry me = {0}; - metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); - if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) { - metaReaderClear(&mr); - code = TSDB_CODE_PAR_TABLE_NOT_EXIST; + SMetaInfo info; + code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info); + if (code) { + code = TSDB_CODE_TDB_TABLE_NOT_EXIST; goto _err; } - if (pRsp->tblFName) strcat(pRsp->tblFName, mr.me.name); - - if (mr.me.type == TSDB_NORMAL_TABLE) { - sverNew = mr.me.ntbEntry.schemaRow.version; - } else { - tDecoderClear(&mr.coder); - - metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); - sverNew = mr.me.stbEntry.schemaRow.version; + if (info.suid != suid) { + code = TSDB_CODE_INVALID_MSG; + goto _err; } - metaReaderClear(&mr); - pRsp->sver = sverNew; + if (info.suid) { + metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info); + } + pRsp->sver = info.skmVer; // create/get STbData to op code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData); @@ -157,7 +149,17 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid SVBufPool *pPool = pTsdb->pVnode->inUse; TSDBKEY lastKey = {.version = version, .ts = eKey}; - // check if table exists (todo) + // check if table exists + SMetaInfo info; + code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info); + if (code) { + code = TSDB_CODE_TDB_TABLE_NOT_EXIST; + goto _err; + } + if (info.suid != suid) { + code = TSDB_CODE_INVALID_MSG; + goto _err; + } code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData); if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 60f0b18a62..76af751196 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1467,6 +1467,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { SColVal colVal; SColVal *pColVal = &colVal; + memset(pColAgg, 0, sizeof(*pColAgg)); + bool minAssigned = false; + bool maxAssigned = false; + *pColAgg = (SColumnDataAgg){.colId = pColData->cid}; for (int32_t iVal = 0; iVal < pColData->nVal; iVal++) { tColDataGetValue(pColData, iVal, pColVal); @@ -1481,72 +1485,86 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { break; case TSDB_DATA_TYPE_TINYINT: { pColAgg->sum += colVal.value.i8; - if (pColAgg->min > colVal.value.i8) { + if (!minAssigned || pColAgg->min > colVal.value.i8) { pColAgg->min = colVal.value.i8; + minAssigned = true; } - if (pColAgg->max < colVal.value.i8) { + if (!maxAssigned || pColAgg->max < colVal.value.i8) { pColAgg->max = colVal.value.i8; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_SMALLINT: { pColAgg->sum += colVal.value.i16; - if (pColAgg->min > colVal.value.i16) { + if (!minAssigned || pColAgg->min > colVal.value.i16) { pColAgg->min = colVal.value.i16; + minAssigned = true; } - if (pColAgg->max < colVal.value.i16) { + if (!maxAssigned || pColAgg->max < colVal.value.i16) { pColAgg->max = colVal.value.i16; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_INT: { pColAgg->sum += colVal.value.i32; - if (pColAgg->min > colVal.value.i32) { + if (!minAssigned || pColAgg->min > colVal.value.i32) { pColAgg->min = colVal.value.i32; + minAssigned = true; } - if (pColAgg->max < colVal.value.i32) { + if (!maxAssigned || pColAgg->max < colVal.value.i32) { pColAgg->max = colVal.value.i32; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_BIGINT: { pColAgg->sum += colVal.value.i64; - if (pColAgg->min > colVal.value.i64) { + if (!minAssigned || pColAgg->min > colVal.value.i64) { pColAgg->min = colVal.value.i64; + minAssigned = true; } - if (pColAgg->max < colVal.value.i64) { + if (!maxAssigned || pColAgg->max < colVal.value.i64) { pColAgg->max = colVal.value.i64; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_FLOAT: { *(double*)(&pColAgg->sum) += colVal.value.f; - if (*(double*)(&pColAgg->min) > colVal.value.f) { + if (!minAssigned || *(double*)(&pColAgg->min) > colVal.value.f) { *(double*)(&pColAgg->min) = colVal.value.f; + minAssigned = true; } - if (*(double*)(&pColAgg->max) < colVal.value.f) { + if (!maxAssigned || *(double*)(&pColAgg->max) < colVal.value.f) { *(double*)(&pColAgg->max) = colVal.value.f; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_DOUBLE: { *(double*)(&pColAgg->sum) += colVal.value.d; - if (*(double*)(&pColAgg->min) > colVal.value.d) { + if (!minAssigned || *(double*)(&pColAgg->min) > colVal.value.d) { *(double*)(&pColAgg->min) = colVal.value.d; + minAssigned = true; } - if (*(double*)(&pColAgg->max) < colVal.value.d) { + if (!maxAssigned || *(double*)(&pColAgg->max) < colVal.value.d) { *(double*)(&pColAgg->max) = colVal.value.d; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_VARCHAR: break; case TSDB_DATA_TYPE_TIMESTAMP: { - if (pColAgg->min > colVal.value.i64) { + if (!minAssigned || pColAgg->min > colVal.value.i64) { pColAgg->min = colVal.value.i64; + minAssigned = true; } - if (pColAgg->max < colVal.value.i64) { + if (!maxAssigned || pColAgg->max < colVal.value.i64) { pColAgg->max = colVal.value.i64; + maxAssigned = true; } break; } @@ -1554,41 +1572,49 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { break; case TSDB_DATA_TYPE_UTINYINT: { pColAgg->sum += colVal.value.u8; - if (pColAgg->min > colVal.value.u8) { + if (!minAssigned || pColAgg->min > colVal.value.u8) { pColAgg->min = colVal.value.u8; + minAssigned = true; } - if (pColAgg->max < colVal.value.u8) { + if (!maxAssigned || pColAgg->max < colVal.value.u8) { pColAgg->max = colVal.value.u8; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_USMALLINT: { pColAgg->sum += colVal.value.u16; - if (pColAgg->min > colVal.value.u16) { + if (!minAssigned || pColAgg->min > colVal.value.u16) { pColAgg->min = colVal.value.u16; + minAssigned = true; } - if (pColAgg->max < colVal.value.u16) { + if (!maxAssigned || pColAgg->max < colVal.value.u16) { pColAgg->max = colVal.value.u16; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_UINT: { pColAgg->sum += colVal.value.u32; - if (pColAgg->min > colVal.value.u32) { + if (!minAssigned || pColAgg->min > colVal.value.u32) { pColAgg->min = colVal.value.u32; + minAssigned = true; } - if (pColAgg->max < colVal.value.u32) { + if (!minAssigned || pColAgg->max < colVal.value.u32) { pColAgg->max = colVal.value.u32; + maxAssigned = true; } break; } case TSDB_DATA_TYPE_UBIGINT: { pColAgg->sum += colVal.value.u64; - if (pColAgg->min > colVal.value.u64) { + if (!minAssigned || pColAgg->min > colVal.value.u64) { pColAgg->min = colVal.value.u64; + minAssigned = true; } - if (pColAgg->max < colVal.value.u64) { + if (!maxAssigned || pColAgg->max < colVal.value.u64) { pColAgg->max = colVal.value.u64; + maxAssigned = true; } break; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 43c9b4c09f..54a5aacbd2 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -196,36 +196,42 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; /* TQ */ case TDMT_VND_MQ_VG_CHANGE: - if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + if (tqProcessVgChangeReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_VND_MQ_VG_DELETE: - if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessVgDeleteReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } break; case TDMT_VND_MQ_COMMIT_OFFSET: - if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead), version) < 0) { + if (tqProcessOffsetCommitReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; - case TDMT_VND_CHECK_ALTER_INFO: - if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + case TDMT_VND_ADD_CHECK_INFO: + if (tqProcessAddCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { + goto _err; + } + break; + case TDMT_VND_DELETE_CHECK_INFO: + if (tqProcessDelCheckInfoReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } break; case TDMT_STREAM_TASK_DEPLOY: { - if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), + if (tqProcessTaskDeployReq(pVnode->pTq, version, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if (tqProcessTaskDropReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; @@ -869,7 +875,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); - sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname); + sprintf(submitBlkRsp.tblFName, "%s.%s", pVnode->config.dbname, createTbReq.name); msgIter.uid = createTbReq.uid; if (createTbReq.type == TSDB_CHILD_TABLE) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 86b9036454..d15dc99122 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3315,7 +3315,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey); } else { - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); + blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId); doApplyScalarCalculation(pOperator, pBlock, order, scanFlag); if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 02089d9fec..8c4fbe7971 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1175,19 +1175,19 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* tsCol = (TSKEY*)pColDataInfo->pData; + bool tableInserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { SResultRowInfo dumyInfo; dumyInfo.cur.pageId = -1; bool isClosed = false; STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; - if (isOverdue(tsCol[rowId], &pInfo->twAggSup)) { + if (tableInserted && isOverdue(tsCol[rowId], &pInfo->twAggSup)) { win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); isClosed = isCloseWindow(&win, &pInfo->twAggSup); } - bool inserted = updateInfoIsTableInserted(pInfo->pUpdateInfo, pBlock->info.uid); // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - bool closedWin = isClosed && inserted && isSignleIntervalWindow(pInfo) && + bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup); if ((update || closedWin) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b234ff97c9..c19b459cdb 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -192,6 +192,24 @@ static bool validateTimezoneFormat(const SValueNode* pVal) { return true; } +static int32_t countTrailingSpaces(const SValueNode* pVal, bool isLtrim) { + int32_t numOfSpaces = 0; + int32_t len = varDataLen(pVal->datum.p); + char* str = varDataVal(pVal->datum.p); + + int32_t startPos = isLtrim ? 0 : len - 1; + int32_t step = isLtrim ? 1 : -1; + for (int32_t i = startPos; i < len || i >= 0; i += step) { + if (!isspace(str[i])) { + break; + } + numOfSpaces++; + } + + return numOfSpaces; + +} + void static addTimezoneParam(SNodeList* pList) { char buf[6] = {0}; time_t t = taosTime(NULL); @@ -293,6 +311,40 @@ static int32_t translateInOutStr(SFunctionNode* pFunc, char* pErrBuf, int32_t le return TSDB_CODE_SUCCESS; } +static int32_t translateTrimStr(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isLtrim) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SExprNode* pPara1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + if (!IS_VAR_DATA_TYPE(pPara1->resType.type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + int32_t numOfSpaces = 0; + SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 0); + // for select trim functions with constant value from table, + // need to set the proper result result schema bytes to avoid + // trailing garbage characters + if (nodeType(pParamNode1) == QUERY_NODE_VALUE) { + SValueNode* pValue = (SValueNode*)pParamNode1; + numOfSpaces = countTrailingSpaces(pValue, isLtrim); + } + + + int32_t resBytes = pPara1->resType.bytes - numOfSpaces; + pFunc->node.resType = (SDataType){.bytes = resBytes, .type = pPara1->resType.type}; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateLtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTrimStr(pFunc, pErrBuf, len, true); +} + +static int32_t translateRtrim(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTrimStr(pFunc, pErrBuf, len, false); +} + static int32_t translateLogarithm(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (1 != numOfParams && 2 != numOfParams) { @@ -2827,7 +2879,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "ltrim", .type = FUNCTION_TYPE_LTRIM, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, - .translateFunc = translateInOutStr, + .translateFunc = translateLtrim, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = ltrimFunction, @@ -2837,7 +2889,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "rtrim", .type = FUNCTION_TYPE_RTRIM, .classification = FUNC_MGT_SCALAR_FUNC | FUNC_MGT_STRING_FUNC, - .translateFunc = translateInOutStr, + .translateFunc = translateRtrim, .getEnvFunc = NULL, .initFunc = NULL, .sprocessFunc = rtrimFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index cbf81f1d0d..5aeaecd598 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1210,7 +1210,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - pBuf->v = val; + *(int64_t*)&pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1223,7 +1223,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { - pBuf->v = val; + *(uint64_t*)&pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1231,11 +1231,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_DOUBLE) { double prev = 0; - GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); + GET_TYPED_DATA(prev, double, type, &pBuf->v); double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { - pBuf->v = val; + *(double*)&pBuf->v = val; if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); @@ -1243,11 +1243,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } } else if (type == TSDB_DATA_TYPE_FLOAT) { double prev = 0; - GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); + GET_TYPED_DATA(prev, double, type, &pBuf->v); double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { - pBuf->v = val; + *(double*)&pBuf->v = val; } if (pCtx->subsidiaries.num > 0) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index d0c5a76f4b..0d36fa0845 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -758,7 +758,9 @@ EDealRes sclRewriteFunction(SNode** pNode, SScalarCtx *ctx) { res->datum.p = taosMemoryCalloc(len, 1); memcpy(res->datum.p, output.columnData->pData, len); } else if (IS_VAR_DATA_TYPE(type)) { - res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); + //res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); + res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData), 1); + res->node.resType.bytes = varDataTLen(output.columnData->pData); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { nodesSetValueNodeValue(res, output.columnData->pData); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b74e838628..64a9537e6c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -81,7 +81,7 @@ void streamMetaClose(SStreamMeta* pMeta) { taosMemoryFree(pMeta); } -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) { +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ebad365ce0..add007e14d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -205,28 +205,6 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - uint64_t ahandle = head->ahandle; \ - CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - if (transQueueSize(&conn->cliMsgs) > 0 && ahandle == 0) { \ - SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, 0); \ - if (cliMsg->type == Release) return; \ - } \ - tDebug("%s conn %p receive release request, refId:%" PRId64 "", CONN_GET_INST_LABEL(conn), conn, conn->refId); \ - if (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - destroyCmsg(pMsg); \ - cliReleaseUnfinishedMsg(conn); \ - transQueueClear(&conn->cliMsgs); \ - addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ - return; \ - } \ - } while (0) #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ do { \ @@ -358,7 +336,6 @@ void cliHandleResp(SCliConn* conn) { if (cliRecvReleaseReq(conn, pHead)) { return; } - CONN_SHOULD_RELEASE(conn, pHead); if (CONN_NO_PERSIST_BY_APP(conn)) { pMsg = transQueuePop(&conn->cliMsgs); @@ -1418,7 +1395,7 @@ int transReleaseCliHandle(void* handle) { } STransMsg tmsg = {.info.handle = handle}; - // TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); + TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; diff --git a/tests/system-test/2-query/irate.py b/tests/system-test/2-query/irate.py index a64e7695c3..408f4b3749 100644 --- a/tests/system-test/2-query/irate.py +++ b/tests/system-test/2-query/irate.py @@ -69,7 +69,7 @@ class TDTestCase: comput_irate_value = origin_result[1][0]*1000/( origin_result[1][-1] - origin_result[0][-1]) else: comput_irate_value = (origin_result[1][0] - origin_result[0][0])*1000/( origin_result[1][-1] - origin_result[0][-1]) - if abs(comput_irate_value - irate_value) <= 0.0000001: + if abs(comput_irate_value - irate_value) <= 0.001: # set as 0.001 avoid floating point precision calculation errors tdLog.info(" irate work as expected , sql is %s "% irate_sql) else: tdLog.exit(" irate work not as expected , sql is %s "% irate_sql) diff --git a/tests/system-test/2-query/mavg.py b/tests/system-test/2-query/mavg.py index d7dc5e6143..0995dfc6ff 100644 --- a/tests/system-test/2-query/mavg.py +++ b/tests/system-test/2-query/mavg.py @@ -25,13 +25,13 @@ from util.cases import * from util.sql import * from util.dnodes import * - +dbname = 'db' class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor()) - def mavg_query_form(self, sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""): + def mavg_query_form(self, sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr=f"{dbname}.t1", condition=""): ''' mavg function: @@ -50,7 +50,7 @@ class TDTestCase: return f"{sel} {func} {col} {m_comm} {k} {r_comm} {alias} {fr} {table_expr} {condition}" - def checkmavg(self,sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr="t1", condition=""): + def checkmavg(self,sel="select", func="mavg(", col="c1", m_comm =",", k=1,r_comm=")", alias="", fr="from",table_expr=f"{dbname}.t1", condition=""): # print(self.mavg_query_form(sel=sel, func=func, col=col, m_comm=m_comm, k=k, r_comm=r_comm, alias=alias, fr=fr, # table_expr=table_expr, condition=condition)) line = sys._getframe().f_back.f_lineno @@ -62,7 +62,7 @@ class TDTestCase: table_expr=table_expr, condition=condition )) - sql = "select * from t1" + sql = f"select * from {dbname}.t1" collist = tdSql.getColNameList(sql) if not isinstance(col, str): @@ -326,9 +326,9 @@ class TDTestCase: self.checkmavg(**case6) # # case7~8: nested query - # case7 = {"table_expr": "(select c1 from stb1)"} + # case7 = {"table_expr": f"(select c1 from {dbname}.stb1)"} # self.checkmavg(**case7) - # case8 = {"table_expr": "(select mavg(c1, 1) c1 from stb1 group by tbname)"} + # case8 = {"table_expr": f"(select mavg(c1, 1) c1 from {dbname}.stb1 group by tbname)"} # self.checkmavg(**case8) # case9~10: mix with tbname/ts/tag/col @@ -362,7 +362,7 @@ class TDTestCase: self.checkmavg(**case17) # # case18~19: with group by # case19 = { - # "table_expr": "stb1", + # "table_expr": f"{dbname}.stb1", # "condition": "partition by tbname" # } # self.checkmavg(**case19) @@ -371,14 +371,14 @@ class TDTestCase: # case20 = {"condition": "order by ts"} # self.checkmavg(**case20) #case21 = { - # "table_expr": "stb1", + # "table_expr": f"{dbname}.stb1", # "condition": "group by tbname order by tbname" #} #self.checkmavg(**case21) # # case22: with union # case22 = { - # "condition": "union all select mavg( c1 , 1 ) from t2" + # "condition": f"union all select mavg( c1 , 1 ) from {dbname}.t2" # } # self.checkmavg(**case22) @@ -486,32 +486,33 @@ class TDTestCase: #tdSql.query(" select mavg( c1 , 1 ) + 2 from t1 ") err41 = {"alias": "+ avg(c1)"} self.checkmavg(**err41) # mix with arithmetic 2 - err42 = {"alias": ", c1"} - self.checkmavg(**err42) # mix with other col - # err43 = {"table_expr": "stb1"} + # err42 = {"alias": ", c1"} + # self.checkmavg(**err42) # mix with other col + # err43 = {"table_expr": f"{dbname}.stb1"} # self.checkmavg(**err43) # select stb directly - err44 = { - "col": "stb1.c1", - "table_expr": "stb1, stb2", - "condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts" - } - self.checkmavg(**err44) # stb join + # err44 = { + # "col": "stb1.c1", + # "table_expr": "stb1, stb2", + # "condition": "where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts" + # } + # self.checkmavg(**err44) # stb join + tdSql.query("select mavg( stb1.c1 , 1 ) from stb1, stb2 where stb1.ts=stb2.ts and stb1.st1=stb2.st2 order by stb1.ts;") err45 = { "condition": "where ts>0 and ts < now interval(1h) fill(next)" } self.checkmavg(**err45) # interval err46 = { - "table_expr": "t1", + "table_expr": f"{dbname}.t1", "condition": "group by c6" } self.checkmavg(**err46) # group by normal col err47 = { - "table_expr": "stb1", + "table_expr": f"{dbname}.stb1", "condition": "group by tbname slimit 1 " } # self.checkmavg(**err47) # with slimit err48 = { - "table_expr": "stb1", + "table_expr": f"{dbname}.stb1", "condition": "group by tbname slimit 1 soffset 1" } # self.checkmavg(**err48) # with soffset @@ -554,8 +555,8 @@ class TDTestCase: err67 = {"k": 0.999999} self.checkmavg(**err67) # k: left out of [1, 1000] err68 = { - "table_expr": "stb1", - "condition": "group by tbname order by tbname" # order by tbname not supported + "table_expr": f"{dbname}.stb1", + "condition": f"group by tbname order by tbname" # order by tbname not supported } self.checkmavg(**err68) @@ -565,42 +566,42 @@ class TDTestCase: for i in range(tbnum): for j in range(data_row): tdSql.execute( - f"insert into t{i} values (" + f"insert into {dbname}.t{i} values (" f"{basetime + (j+1)*10}, {random.randint(-200, -1)}, {random.uniform(200, -1)}, {basetime + random.randint(-200, -1)}, " f"'binary_{j}', {random.uniform(-200, -1)}, {random.choice([0,1])}, {random.randint(-200,-1)}, " f"{random.randint(-200, -1)}, {random.randint(-127, -1)}, 'nchar_{j}' )" ) tdSql.execute( - f"insert into t{i} values (" + f"insert into {dbname}.t{i} values (" f"{basetime - (j+1) * 10}, {random.randint(1, 200)}, {random.uniform(1, 200)}, {basetime - random.randint(1, 200)}, " f"'binary_{j}_1', {random.uniform(1, 200)}, {random.choice([0, 1])}, {random.randint(1,200)}, " f"{random.randint(1,200)}, {random.randint(1,127)}, 'nchar_{j}_1' )" ) tdSql.execute( - f"insert into tt{i} values ( {basetime-(j+1) * 10}, {random.randint(1, 200)} )" + f"insert into {dbname}.tt{i} values ( {basetime-(j+1) * 10}, {random.randint(1, 200)} )" ) pass def mavg_test_table(self,tbnum: int) -> None : - tdSql.execute("drop database if exists db") - tdSql.execute("create database if not exists db keep 3650") - tdSql.execute("use db") + tdSql.execute(f"drop database if exists {dbname}") + tdSql.execute(f"create database if not exists {dbname} keep 3650") + tdSql.execute(f"use {dbname}") tdSql.execute( - "create stable db.stb1 (\ + f"create stable {dbname}.stb1 (\ ts timestamp, c1 int, c2 float, c3 timestamp, c4 binary(16), c5 double, c6 bool, \ c7 bigint, c8 smallint, c9 tinyint, c10 nchar(16)\ ) \ tags(st1 int)" ) tdSql.execute( - "create stable db.stb2 (ts timestamp, c1 int) tags(st2 int)" + f"create stable {dbname}.stb2 (ts timestamp, c1 int) tags(st2 int)" ) for i in range(tbnum): - tdSql.execute(f"create table t{i} using stb1 tags({i})") - tdSql.execute(f"create table tt{i} using stb2 tags({i})") + tdSql.execute(f"create table {dbname}.t{i} using {dbname}.stb1 tags({i})") + tdSql.execute(f"create table {dbname}.tt{i} using {dbname}.stb2 tags({i})") pass @@ -617,25 +618,25 @@ class TDTestCase: tdLog.printNoPrefix("######## insert only NULL test:") for i in range(tbnum): - tdSql.execute(f"insert into t{i}(ts) values ({nowtime - 5})") - tdSql.execute(f"insert into t{i}(ts) values ({nowtime + 5})") + tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime - 5})") + tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime + 5})") self.mavg_current_query() self.mavg_error_query() tdLog.printNoPrefix("######## insert data in the range near the max(bigint/double):") # self.mavg_test_table(tbnum) - # tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " + # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values " # f"({nowtime - (per_table_rows + 1) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})") - # tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " + # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values " # f"({nowtime - (per_table_rows + 2) * 10}, {2**31-1}, {3.4*10**38}, {1.7*10**308}, {2**63-1})") # self.mavg_current_query() # self.mavg_error_query() tdLog.printNoPrefix("######## insert data in the range near the min(bigint/double):") # self.mavg_test_table(tbnum) - # tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " + # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values " # f"({nowtime - (per_table_rows + 1) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {1-2**63})") - # tdSql.execute(f"insert into t1(ts, c1,c2,c5,c7) values " + # tdSql.execute(f"insert into {dbname}.t1(ts, c1,c2,c5,c7) values " # f"({nowtime - (per_table_rows + 2) * 10}, {1-2**31}, {-3.4*10**38}, {-1.7*10**308}, {512-2**63})") # self.mavg_current_query() # self.mavg_error_query() @@ -649,9 +650,9 @@ class TDTestCase: tdLog.printNoPrefix("######## insert data mix with NULL test:") for i in range(tbnum): - tdSql.execute(f"insert into t{i}(ts) values ({nowtime})") - tdSql.execute(f"insert into t{i}(ts) values ({nowtime-(per_table_rows+3)*10})") - tdSql.execute(f"insert into t{i}(ts) values ({nowtime+(per_table_rows+3)*10})") + tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime})") + tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime-(per_table_rows+3)*10})") + tdSql.execute(f"insert into {dbname}.t{i}(ts) values ({nowtime+(per_table_rows+3)*10})") self.mavg_current_query() self.mavg_error_query() @@ -664,67 +665,64 @@ class TDTestCase: tdDnodes.start(index) self.mavg_current_query() self.mavg_error_query() - tdSql.query("select mavg(1,1) from t1") + tdSql.query(f"select mavg(1,1) from {dbname}.t1") tdSql.checkRows(7) tdSql.checkData(0,0,1.000000000) tdSql.checkData(1,0,1.000000000) tdSql.checkData(5,0,1.000000000) - tdSql.query("select mavg(abs(c1),1) from t1") + tdSql.query(f"select mavg(abs(c1),1) from {dbname}.t1") tdSql.checkRows(4) def mavg_support_stable(self): - tdSql.query(" select mavg(1,3) from stb1 ") + tdSql.query(f" select mavg(1,3) from {dbname}.stb1 ") tdSql.checkRows(68) tdSql.checkData(0,0,1.000000000) - tdSql.query("select mavg(c1,3) from stb1 partition by tbname ") + tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by tbname ") tdSql.checkRows(20) - # tdSql.query("select mavg(st1,3) from stb1 partition by tbname") - # tdSql.checkRows(38) - tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") + tdSql.query(f"select mavg(st1,3) from {dbname}.stb1 partition by tbname") + tdSql.checkRows(50) + tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname") tdSql.checkRows(20) - tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") + tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname") tdSql.checkRows(20) - tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") + tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname") tdSql.checkRows(20) - # # bug need fix - # tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname slimit 1 ") - # tdSql.checkRows(2) - # tdSql.error("select mavg(st1+c1,3) from stb1 partition by tbname limit 1 ") # bug need fix - tdSql.query("select mavg(st1+c1,3) from stb1 partition by tbname") + tdSql.query(f"select mavg(st1+c1,3) from {dbname}.stb1 partition by tbname") tdSql.checkRows(20) # bug need fix - # tdSql.query("select tbname , mavg(c1,3) from stb1 partition by tbname") - # tdSql.checkRows(38) - # tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname") - # tdSql.checkRows(38) - # tdSql.query("select tbname , mavg(st1,3) from stb1 partition by tbname slimit 1") - # tdSql.checkRows(2) + tdSql.query(f"select tbname , mavg(c1,3) from {dbname}.stb1 partition by tbname") + tdSql.checkRows(20) + tdSql.query(f"select tbname , mavg(st1,3) from {dbname}.stb1 partition by tbname") + tdSql.checkRows(50) + tdSql.query(f"select tbname , mavg(st1,3) from {dbname}.stb1 partition by tbname slimit 1") + tdSql.checkRows(5) # partition by tags - # tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1") - # tdSql.checkRows(38) - # tdSql.query("select mavg(c1,3) from stb1 partition by st1") - # tdSql.checkRows(38) - # tdSql.query("select st1 , mavg(c1,3) from stb1 partition by st1 slimit 1") - # tdSql.checkRows(2) - # tdSql.query("select mavg(c1,3) from stb1 partition by st1 slimit 1") - # tdSql.checkRows(2) + tdSql.query(f"select st1 , mavg(c1,3) from {dbname}.stb1 partition by st1") + tdSql.checkRows(20) + tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by st1") + tdSql.checkRows(20) + tdSql.query(f"select st1 , mavg(c1,3) from {dbname}.stb1 partition by st1 slimit 1") + tdSql.checkRows(2) + tdSql.query(f"select mavg(c1,3) from {dbname}.stb1 partition by st1 slimit 1") + tdSql.checkRows(2) # partition by col - # tdSql.query("select c1 , mavg(c1,3) from stb1 partition by c1") - # tdSql.checkRows(38) - # tdSql.query("select mavg(c1 ,3) from stb1 partition by c1") - # tdSql.checkRows(38) - # tdSql.query("select c1 , mavg(c1,3) from stb1 partition by st1 slimit 1") - # tdSql.checkRows(2) - # tdSql.query("select diff(c1) from stb1 partition by st1 slimit 1") - # tdSql.checkRows(2) + tdSql.query(f"select c1 , mavg(c1,3) from {dbname}.stb1 partition by c1") + tdSql.checkRows(0) + tdSql.query(f"select c1 , mavg(c1,1) from {dbname}.stb1 partition by c1") + tdSql.checkRows(40) + tdSql.query(f"select c1, c2, c3, c4, mavg(c1,3) from {dbname}.stb1 partition by tbname ") + tdSql.checkRows(20) + tdSql.query(f"select c1, c2, c3, c4, mavg(123,3) from {dbname}.stb1 partition by tbname ") + tdSql.checkRows(50) + def run(self): import traceback diff --git a/tests/system-test/2-query/sample.py b/tests/system-test/2-query/sample.py index 46d2062341..45be0ef8ab 100644 --- a/tests/system-test/2-query/sample.py +++ b/tests/system-test/2-query/sample.py @@ -873,7 +873,7 @@ class TDTestCase: # bug need fix tdSql.query("select c1 ,t1, sample(c1,2) from db.stb1 partition by c1 ") tdSql.query("select sample(c1,2) from db.stb1 partition by c1 ") - # tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ") + tdSql.query("select c1 ,ind, sample(c1,2) from sample_db.st partition by c1 ") def run(self): import traceback diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index e9fbba86f9..1e958bdb29 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -113,7 +113,7 @@ python3 ./test.py -f 2-query/hyperloglog.py -R python3 ./test.py -f 2-query/interp.py python3 ./test.py -f 2-query/interp.py -R python3 ./test.py -f 2-query/irate.py -# python3 ./test.py -f 2-query/irate.py -R +python3 ./test.py -f 2-query/irate.py -R python3 ./test.py -f 2-query/join.py python3 ./test.py -f 2-query/join.py -R python3 ./test.py -f 2-query/last_row.py @@ -169,7 +169,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py python3 ./test.py -f 2-query/elapsed.py python3 ./test.py -f 2-query/csum.py -#python3 ./test.py -f 2-query/mavg.py +python3 ./test.py -f 2-query/mavg.py python3 ./test.py -f 2-query/sample.py python3 ./test.py -f 2-query/function_diff.py python3 ./test.py -f 2-query/unique.py @@ -358,7 +358,7 @@ python3 ./test.py -f 2-query/interp.py -Q 2 python3 ./test.py -f 2-query/avg.py -Q 2 # python3 ./test.py -f 2-query/elapsed.py -Q 2 python3 ./test.py -f 2-query/csum.py -Q 2 -#python3 ./test.py -f 2-query/mavg.py -Q 2 +python3 ./test.py -f 2-query/mavg.py -Q 2 python3 ./test.py -f 2-query/sample.py -Q 2 python3 ./test.py -f 2-query/function_diff.py -Q 2 python3 ./test.py -f 2-query/unique.py -Q 2 @@ -445,7 +445,7 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py -Q 3 # python3 ./test.py -f 2-query/avg.py -Q 3 # python3 ./test.py -f 2-query/elapsed.py -Q 3 python3 ./test.py -f 2-query/csum.py -Q 3 -#python3 ./test.py -f 2-query/mavg.py -Q 3 +python3 ./test.py -f 2-query/mavg.py -Q 3 python3 ./test.py -f 2-query/sample.py -Q 3 python3 ./test.py -f 2-query/function_diff.py -Q 3 python3 ./test.py -f 2-query/unique.py -Q 3