From 503c93585d00eb9935ef1b0c60d14084537d08e6 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Mon, 1 Apr 2024 14:44:27 +0800 Subject: [PATCH 01/16] enh: update taosws feature to rustls --- tools/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index a1c5405253..518e97a219 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -57,7 +57,7 @@ IF (TD_WEBSOCKET) COMMAND git clean -f -d BUILD_COMMAND COMMAND cargo update - COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored + COMMAND cargo build --release -p taos-ws-sys --features rustls INSTALL_COMMAND COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include From 529eadb921de275e2622129f2fc817b5fb2b6228 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Mon, 1 Apr 2024 16:15:26 +0800 Subject: [PATCH 02/16] enh: update taosws feature to rustls --- tools/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 518e97a219..188abb4b58 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -18,7 +18,7 @@ IF (TD_WEBSOCKET) COMMAND git clean -f -d BUILD_COMMAND COMMAND cargo update - COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features native-tls + COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features rustls INSTALL_COMMAND COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include @@ -37,7 +37,7 @@ IF (TD_WEBSOCKET) COMMAND git clean -f -d BUILD_COMMAND COMMAND cargo update - COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored + COMMAND cargo build --release -p taos-ws-sys --features rustls INSTALL_COMMAND COMMAND cp target/release/taosws.dll ${CMAKE_BINARY_DIR}/build/lib COMMAND cp target/release/taosws.dll.lib ${CMAKE_BINARY_DIR}/build/lib/taosws.lib From 9ca84091dfd6b8bbfa6fbf59f4b053f98a3570ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 1 Apr 2024 16:31:10 +0800 Subject: [PATCH 03/16] fix(stream): reset the ready upstream counter after task-reset. --- source/libs/stream/src/streamCheckpoint.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f58c72eded..7f52c5d2f0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -278,6 +278,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.numOfNotReady = 0; pTask->chkInfo.transId = 0; pTask->chkInfo.dispatchCheckpointTrigger = false; + pTask->chkInfo.downstreamAlignNum = 0; streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks if (clearChkpReadyMsg) { From 1c56c5ab83a78ccec3042ecc5f80e0fd304fe2b1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 1 Apr 2024 19:16:54 +0800 Subject: [PATCH 04/16] fix:delete the file that testing JSON compression rate --- tests/system-test/buildJson.py | 243 --------------------------------- 1 file changed, 243 deletions(-) delete mode 100644 tests/system-test/buildJson.py diff --git a/tests/system-test/buildJson.py b/tests/system-test/buildJson.py deleted file mode 100644 index 6e9e9f83e1..0000000000 --- a/tests/system-test/buildJson.py +++ /dev/null @@ -1,243 +0,0 @@ -# 写一段python代码,生成一个JSON串,json 串为数组,数组长度为10000,每个元素为包含4000个key-value对的JSON字符串,json 数组里每个元素里的4000个key不相同,元素之间使用相同的key,key值为英文单词,value 为int值,且value 的范围是[0, 256]。把json串紧凑形式写入文件,把json串存入parquet文件中,把json串写入avro文件中,把json串写入到postgre sql表中,表有两列第一列主int类型主键,第二列为json类型,数组的每个元素写入json类型里 -import csv -import json -import os -import random -import string -import time - -from faker import Faker -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import fastavro -import psycopg2 -from psycopg2.extras import Json - - -def get_dir_size(start_path='.'): - total = 0 - for dirpath, dirs, files in os.walk(start_path): - for f in files: - fp = os.path.join(dirpath, f) - # 获取文件大小并累加到total上 - total += os.path.getsize(fp) - return total - - -def to_avro_record(obj): - return {key: value for key, value in obj.items()} - - -def generate_random_string(length): - return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) - - -def generate_random_values(t): - if t == 0: - return random.randint(-255, 256) - elif t == 1: - return random.randint(-2100000000, 2100000000) - elif t == 2: - return random.uniform(-10000.0, 10000.0) - elif t == 3: - return generate_random_string(10) - elif t == 4: - return random.choice([True, False]) - - -def generate_json_object(key_set, value_set): - values = [generate_random_values(t) for t in value_set] - return dict(zip(key_set, values)) - - -def generate_json_array(keys, values, array_length): - return [generate_json_object(keys, values) for _ in range(array_length)] - - -def write_parquet_file(parquet_file, json_array): - df = pd.DataFrame(json_array) - table = pa.Table.from_pandas(df) - pq.write_table(table, parquet_file + ".parquet") - - -def write_json_file(json_file, json_array): - with open(json_file + ".json", 'w') as f: - json.dump(json_array, f, separators=(',', ':')) - - -def generate_avro_schema(k, t): - if t == 0: - return {"name": k, "type": "int", "logicalType": "int"} - elif t == 1: - return {"name": k, "type": "int", "logicalType": "int"} - elif t == 2: - return {"name": k, "type": "float"} - elif t == 3: - return {"name": k, "type": "string"} - elif t == 4: - return {"name": k, "type": "boolean"} - - -def write_avro_file(avro_file, json_array, keys, values): - k = list(json_array[0].keys()) - - if keys != k: - raise ValueError("keys and values should have the same length") - - avro_schema = { - "type": "record", - "name": "MyRecord", - "fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()] - } - - avro_records = [to_avro_record(obj) for obj in json_array] - with open(avro_file + ".avro", 'wb') as f: - fastavro.writer(f, avro_schema, avro_records) - - -def write_pg_file(json_array): - conn_str = "dbname=mydatabase user=myuser host=localhost" - conn = psycopg2.connect(conn_str) - cur = conn.cursor() - - cur.execute("drop table if exists my_table") - conn.commit() - - # 创建表(如果不存在) - cur.execute(""" - CREATE TABLE IF NOT EXISTS my_table ( - id SERIAL PRIMARY KEY, - json_data JSONB - ); - """) - conn.commit() - - # 执行SQL查询 - cur.execute("SELECT count(*) FROM my_table") - # 获取查询结果 - rows = cur.fetchall() - # 打印查询结果 - for row in rows: - print("rows before:", row[0]) - - # 插入数据 - for idx, json_obj in enumerate(json_array): - # print(json.dumps(json_obj)) - cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),)) - - conn.commit() # 提交事务 - - # 执行SQL查询 - cur.execute("SELECT count(*) FROM my_table") - # 获取查询结果 - rows = cur.fetchall() - # 打印查询结果 - for row in rows: - print("rows after:", row[0]) - - # # 执行SQL查询 - # cur.execute("SELECT pg_relation_size('my_table')") - # # 获取查询结果 - # rows = cur.fetchall() - # # 打印查询结果 - # size = 0 - # for row in rows: - # size = row[0] - # print("table size:", row[0]) - - # 关闭游标和连接 - cur.close() - conn.close() - -def read_parquet_file(parquet_file): - table = pq.read_table(parquet_file + ".parquet") - df = table.to_pandas() - print(df) - - -def read_avro_file(avg_file): - with open(avg_file + ".avro", 'rb') as f: - reader = fastavro.reader(f) - - for record in reader: - print(record) - - -def read_json_file(csv_file): - with open(csv_file + ".json", 'r') as f: - data = json.load(f) - print(data) - - -def main(): - key_length = 7 - key_sizes = 4000 - row_sizes = 10000 - file_name = "output" - - # cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)] - cases = [(2, 2), (3, 3), (0, 4)] - - for data in cases: - begin, end = data - print(f"执行类型:{begin}-{end}") - - N = 2 - for _ in range(N): - - t0 = time.time() - - keys = [generate_random_string(key_length) for _ in range(key_sizes)] - values = [random.randint(begin, end) for _ in range(key_sizes)] - # 生成JSON数组 - json_array = generate_json_array(keys, values, row_sizes) - - t1 = time.time() - - write_json_file(file_name, json_array) - - t2 = time.time() - - write_parquet_file(file_name, json_array) - - t3 = time.time() - - write_avro_file(file_name, json_array, keys, values) - - t4 = time.time() - - size = write_pg_file(json_array) - - t5 = time.time() - - print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json")) - print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet")) - print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro")) - print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024) - - # read_json_file(file_name) - # read_parquet_file(file_name) - # read_avro_file(file_name) - print(f"\n---------------\n") - -if __name__ == "__main__": - main() - -# 压缩文件 -# import os -# -# import lz4.frame -# -# -# files =["output.json", "output.parquet", "output.avro"] -# def compress_file(input_path, output_path): -# with open(input_path, 'rb') as f_in: -# compressed_data = lz4.frame.compress(f_in.read()) -# -# with open(output_path, 'wb') as f_out: -# f_out.write(compressed_data) -# -# for file in files: -# compress_file(file, file + ".lz4") -# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4")) From 174b0a104e5cab39e3eaa953dc579c9f39cdef54 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 1 Apr 2024 19:59:30 +0800 Subject: [PATCH 05/16] fix:case error --- source/dnode/mnode/impl/src/mndConsumer.c | 8 ++++---- tests/system-test/8-stream/stream_basic.py | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 52671f6b66..ed9333f480 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -292,16 +292,16 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){ int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp); if (tlen <= 0){ - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_TMQ_INVALID_MSG; } void *buf = rpcMallocCont(tlen); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){ + if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){ rpcFreeCont(buf); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_TMQ_INVALID_MSG; } pMsg->info.rsp = buf; pMsg->info.rspLen = tlen; @@ -316,7 +316,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = NULL; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 5167423ea3..ff16bee787 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -96,9 +96,28 @@ class TDTestCase: time.sleep(2) tdSql.query("select * from sta") tdSql.checkRows(3) + tdSql.query("select tbname from sta order by tbname") + if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'): + tdLog.exit("error1") + + if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'): + tdLog.exit("error2") + + if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'): + tdLog.exit("error3") tdSql.query("select * from stb") tdSql.checkRows(3) + tdSql.query("select tbname from stb order by tbname") + if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'): + tdLog.exit("error4") + + if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'): + tdLog.exit("error5") + + if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'): + tdLog.exit("error6") + # run def run(self): self.case1() From 9125a28061d90e51e7a4fc180909a853af3bcbca Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Tue, 2 Apr 2024 09:13:55 +0800 Subject: [PATCH 06/16] docs: merge example code to 3.0 --- .../rust/nativeexample/examples/query.rs | 66 +++++++ .../rust/nativeexample/examples/schemaless.rs | 80 +++++++++ .../rust/nativeexample/examples/stmt.rs | 37 ++++ .../rust/nativeexample/examples/tmq.rs | 166 ++++++++++++++++++ 4 files changed, 349 insertions(+) create mode 100644 docs/examples/rust/nativeexample/examples/query.rs create mode 100644 docs/examples/rust/nativeexample/examples/schemaless.rs create mode 100644 docs/examples/rust/nativeexample/examples/stmt.rs create mode 100644 docs/examples/rust/nativeexample/examples/tmq.rs diff --git a/docs/examples/rust/nativeexample/examples/query.rs b/docs/examples/rust/nativeexample/examples/query.rs new file mode 100644 index 0000000000..dfe55e8749 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/query.rs @@ -0,0 +1,66 @@ +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + + // ANCHOR: create_db_and_table + let db = "power"; + // create database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + // create table + taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(24))", + ]).await?; + // ANCHOR_END: create_db_and_table + + // ANCHOR: insert_data + let inserted = taos.exec("INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) ").await?; + + println!("inserted: {} rows", inserted); + // ANCHOR_END: insert_data + + // ANCHOR: query_data + let mut result = taos.query("SELECT * FROM power.meters").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); + } + + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + // ANCHOR_END: query_data + + // ANCHOR: query_with_req_id + let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?; + // ANCHOR_END: query_with_req_id + +} diff --git a/docs/examples/rust/nativeexample/examples/schemaless.rs b/docs/examples/rust/nativeexample/examples/schemaless.rs new file mode 100644 index 0000000000..44ce0fe694 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/schemaless.rs @@ -0,0 +1,80 @@ +use taos_query::common::SchemalessPrecision; +use taos_query::common::SchemalessProtocol; +use taos_query::common::SmlDataBuilder; + +use crate::AsyncQueryable; +use crate::AsyncTBuilder; +use crate::TaosBuilder; + +async fn put() -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "taos=debug"); + pretty_env_logger::init(); + let dsn = + std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string()); + log::debug!("dsn: {:?}", &dsn); + + let client = TaosBuilder::from_dsn(dsn)?.build().await?; + + let db = "power"; + + client.exec(format!("drop database if exists {db}")).await?; + + client + .exec(format!("create database if not exists {db}")) + .await?; + + // should specify database before insert + client.exec(format!("use {db}")).await?; + + // SchemalessProtocol::Line + let data = [ + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000", + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Line) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(100u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + // SchemalessProtocol::Telnet + let data = [ + "meters.current 1648432611249 10.3 location=California.SanFrancisco group=2", + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Telnet) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(200u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + // SchemalessProtocol::Json + let data = [ + r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"# + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Json) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(300u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + client.exec(format!("drop database if exists {db}")).await?; + + Ok(()) +} diff --git a/docs/examples/rust/nativeexample/examples/stmt.rs b/docs/examples/rust/nativeexample/examples/stmt.rs new file mode 100644 index 0000000000..0194eccdf1 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/stmt.rs @@ -0,0 +1,37 @@ +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build().await?; + + taos.exec("DROP DATABASE IF EXISTS power").await?; + taos.create_database("power").await?; + taos.use_database("power").await?; + taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; + + let mut stmt = Stmt::init(&taos).await?; + stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?; + + const NUM_TABLES: usize = 10; + const NUM_ROWS: usize = 10; + for i in 0..NUM_TABLES { + let table_name = format!("d{}", i); + let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)]; + stmt.set_tbname_tags(&table_name, &tags).await?; + for j in 0..NUM_ROWS { + let values = vec![ + ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]), + ColumnView::from_floats(vec![10.3 + j as f32]), + ColumnView::from_ints(vec![219 + j as i32]), + ColumnView::from_floats(vec![0.31 + j as f32]), + ]; + stmt.bind(&values).await?; + } + stmt.add_batch().await?; + } + + // execute. + let rows = stmt.execute().await?; + assert_eq!(rows, NUM_TABLES * NUM_ROWS); + Ok(()) +} diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs new file mode 100644 index 0000000000..764c0c1fc8 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -0,0 +1,166 @@ +use std::time::Duration; +use std::str::FromStr; + +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + pretty_env_logger::formatted_timed_builder() + .filter_level(log::LevelFilter::Info) + .init(); + use taos_query::prelude::*; + let dsn = "taos://localhost:6030".to_string(); + log::info!("dsn: {}", dsn); + let mut dsn = Dsn::from_str(&dsn)?; + + let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; + + // prepare database and table + taos.exec_many([ + "drop topic if exists topic_meters", + "drop database if exists power", + "create database if not exists power WAL_RETENTION_PERIOD 86400", + "use power", + + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))", + + "create table if not exists power.d001 using power.meters tags(1,'location')", + + ]) + .await?; + + taos.exec_many([ + "drop database if exists db2", + "create database if not exists db2 wal_retention_period 3600", + "use db2", + ]) + .await?; + + // ANCHOR: create_topic + taos.exec_many([ + "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters", + ]) + .await?; + // ANCHOR_END: create_topic + + // ANCHOR: create_consumer + dsn.params.insert("group.id".to_string(), "abc".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + + let builder = TmqBuilder::from_dsn(&dsn)?; + let mut consumer = builder.build().await?; + // ANCHOR_END: create_consumer + + // ANCHOR: subscribe + consumer.subscribe(["topic_meters"]).await?; + // ANCHOR_END: subscribe + + // ANCHOR: consume + { + let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); + + while let Some((offset, message)) = stream.try_next().await? { + + let topic: &str = offset.topic(); + let database = offset.database(); + let vgroup_id = offset.vgroup_id(); + log::debug!( + "topic: {}, database: {}, vgroup_id: {}", + topic, + database, + vgroup_id + ); + + match message { + MessageSet::Meta(meta) => { + log::info!("Meta"); + let raw = meta.as_raw_meta().await?; + taos.write_raw_meta(&raw).await?; + + let json = meta.as_json_meta().await?; + let sql = json.to_string(); + if let Err(err) = taos.exec(sql).await { + println!("maybe error: {}", err); + } + } + MessageSet::Data(data) => { + log::info!("Data"); + while let Some(data) = data.fetch_raw_block().await? { + log::debug!("data: {:?}", data); + } + } + MessageSet::MetaData(meta, data) => { + log::info!("MetaData"); + let raw = meta.as_raw_meta().await?; + taos.write_raw_meta(&raw).await?; + + let json = meta.as_json_meta().await?; + let sql = json.to_string(); + if let Err(err) = taos.exec(sql).await { + println!("maybe error: {}", err); + } + + while let Some(data) = data.fetch_raw_block().await? { + log::debug!("data: {:?}", data); + } + } + } + consumer.commit(offset).await?; + } + } + // ANCHOR_END: consume + + // ANCHOR: assignments + let assignments = consumer.assignments().await.unwrap(); + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + + // seek offset + for topic_vec_assignment in assignments { + let topic = &topic_vec_assignment.0; + let vec_assignment = topic_vec_assignment.1; + for assignment in vec_assignment { + let vgroup_id = assignment.vgroup_id(); + let current = assignment.current_offset(); + let begin = assignment.begin(); + let end = assignment.end(); + log::debug!( + "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + topic, + vgroup_id, + current, + begin, + end + ); + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; + if res.is_err() { + log::error!("seek offset error: {:?}", res); + let a = consumer.assignments().await.unwrap(); + log::error!("assignments: {:?}", a); + } + // ANCHOR_END: seek_offset + } + + let topic_assignment = consumer.topic_assignment(topic).await; + log::debug!("topic assignment: {:?}", topic_assignment); + } + + // after seek offset + let assignments = consumer.assignments().await.unwrap(); + log::info!("after seek offset assignments: {:?}", assignments); + + // ANCHOR: unsubscribe + consumer.unsubscribe().await; + // ANCHOR_END: unsubscribe + + tokio::time::sleep(Duration::from_secs(1)).await; + + taos.exec_many([ + "drop database db2", + "drop topic topic_meters", + "drop database power", + ]) + .await?; + Ok(()) +} From aff681ac64a948c5f5e2b8f8e5b150724992680f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 10:43:38 +0800 Subject: [PATCH 07/16] fix(tsdb):set correct varchar type compare function. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 5469eae1cd..2e6a0a43e9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -107,7 +107,21 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) { if (p1->numOfPKs == 0) { return 0; } else { - return comparFn(&p1->pks[0].val, &p2->pks[0].val); + if (IS_VAR_DATA_TYPE(p1->pks[0].type)) { + int32_t len = TMIN(p1->pks[0].nData, p2->pks[0].nData); + int32_t ret = strncmp((char*)p1->pks[0].pData, (char*)p2->pks[0].pData, len); + if (ret == 0) { + if (p1->pks[0].nData == p2->pks[0].nData) { + return 0; + } else { + return p1->pks[0].nData > p2->pks[0].nData?1:-1; + } + } else { + return ret > 0? 1:-1; + } + } else { + return comparFn(&p1->pks[0].val, &p2->pks[0].val); + } } } From 771c6940b1adb300b0809f151d2078244ea6d310 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 2 Apr 2024 13:39:41 +0800 Subject: [PATCH 08/16] fix: initialize pk type and pk data of output SFirstLastRes when merge --- source/libs/function/src/builtinsimpl.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fafc313afc..70e906c6ec 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2821,11 +2821,13 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p pOutput->isNull = pInput->isNull; pOutput->ts = pInput->ts; pOutput->bytes = pInput->bytes; + pOutput->pkType = pInput->pkType; memcpy(pOutput->buf, pInput->buf, pOutput->bytes); if (pInput->pkData) { pOutput->pkBytes = pInput->pkBytes; memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes); + pOutput->pkData = pOutput->buf + pOutput->bytes; } return TSDB_CODE_SUCCESS; } From 8fe7478932388af8d0179afc3c01c6669b64afd5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 2 Apr 2024 14:24:32 +0800 Subject: [PATCH 09/16] fix: TD-29352 --- source/common/src/tdataformat.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 7de685d9b1..ff25954fc1 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -1061,7 +1061,6 @@ _exit: static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag) { int32_t code = 0; - SKVIdx *pKVIdx = (SKVIdx *)pRow->data; uint8_t *pv = NULL; int32_t iColData = 0; SColData *pColData = &aColData[iColData]; @@ -1069,6 +1068,14 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo STColumn *pTColumn = &pTSchema->columns[iTColumn]; int32_t iCol = 0; + // primary keys + uint8_t *data = pRow->data; + SPrimaryKeyIndex index; + for (int32_t i = 0; i < pRow->numOfPKs; i++) { + data += tGetPrimaryKeyIndex(data, &index); + } + + SKVIdx *pKVIdx = (SKVIdx *)data; if (pRow->flag & KV_FLG_LIT) { pv = pKVIdx->idx + pKVIdx->nCol; } else if (pRow->flag & KV_FLG_MID) { From 77d912c2102985df895ca2ed8e0226653b6c33dc Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 2 Apr 2024 14:39:13 +0800 Subject: [PATCH 10/16] fix: primary key should not be null or none --- include/libs/nodes/querynodes.h | 1 + include/libs/qcom/query.h | 1 + source/libs/parser/src/parInsertSql.c | 23 +++++++++++++++++----- source/libs/parser/src/parInsertStmt.c | 12 ++++++++++- source/libs/parser/src/parTranslater.c | 17 ++++++++++++---- source/libs/planner/src/planLogicCreater.c | 1 + source/libs/qcom/src/querymsg.c | 8 ++++++++ 7 files changed, 53 insertions(+), 10 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index cdbeda0012..22c5e74910 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -85,6 +85,7 @@ typedef struct SColumnNode { char colName[TSDB_COL_NAME_LEN]; int16_t dataBlockId; int16_t slotId; + int16_t numOfPKs; bool tableHasPk; bool isPk; } SColumnNode; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 0b192d5593..47783953c4 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -76,6 +76,7 @@ typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision col_id_t numOfColumns; // the number of columns + int16_t numOfPKs; int32_t rowSize; // row size of the schema } STableComInfo; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 1a41ba5fb9..94c0ca99e1 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -183,6 +183,8 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E pBoundInfo->numOfBound = 0; + bool hasPK = pTableMeta->tableInfo.numOfPKs; + int16_t numOfBoundPKs = 0; int16_t lastColIdx = -1; // last column found int32_t code = TSDB_CODE_SUCCESS; while (TSDB_CODE_SUCCESS == code) { @@ -219,14 +221,20 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E pUseCols[index] = true; pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; ++pBoundInfo->numOfBound; + if (hasPK && (pSchema[index].flags & COL_IS_KEY)) ++numOfBoundPKs; } } - if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) { - code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null"); + if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType)) { + if (!pUseCols[0]) { + code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column should not be null"); + } + if (numOfBoundPKs != pTableMeta->tableInfo.numOfPKs) { + code = buildInvalidOperationMsg(&pCxt->msg, "primary key column should not be none"); + } } if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) { - code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null"); + code = buildInvalidOperationMsg(&pCxt->msg, "tbname column should not be null"); } taosMemoryFree(pUseCols); @@ -469,13 +477,15 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, char* endptr = NULL; int32_t code = TSDB_CODE_SUCCESS; +#if 0 if (isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z); + return buildSyntaxErrMsg(pMsgBuf, "primary timestamp column can not be null", pToken->z); } return TSDB_CODE_SUCCESS; } +#endif // strcpy(val->colName, pSchema->name); val->cid = pSchema->colId; @@ -1643,7 +1653,10 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type); if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z); + return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp column should not be null", pToken->z); + } + if (pSchema->flags & COL_IS_KEY) { + return buildSyntaxErrMsg(&pCxt->msg, "primary key column should not be null", pToken->z); } pVal->flag = CV_FLAG_NULL; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index a88aec20b3..fb765dc3de 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -267,6 +267,11 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in pBind = bind + c; } + if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){ + code = buildInvalidOperationMsg(&pBuf, "primary key column should not be null"); + goto _return; + } + code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); if (code) { goto _return; @@ -313,7 +318,12 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu pBind = bind; } - tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1); + if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) { + code = buildInvalidOperationMsg(&pBuf, "primary key column should not be null"); + goto _return; + } + + tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1); qDebug("stmt col %d bind %d rows data", colIdx, rowNum); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b08552fc3d..830b7f7396 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -955,6 +955,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p } pCol->tableHasPk = hasPkInTable(pTable->pMeta); pCol->isPk = (pCol->tableHasPk) && (pColSchema->flags & COL_IS_KEY); + pCol->numOfPKs = pTable->pMeta->tableInfo.numOfPKs; } static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) { @@ -5084,9 +5085,11 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns"); } - SNode* pPrimaryKeyExpr = NULL; - SNode* pBoundCol = NULL; - SNode* pProj = NULL; + SNode* pPrimaryKeyExpr = NULL; + SNode* pBoundCol = NULL; + SNode* pProj = NULL; + int16_t numOfPKs = 0; + int16_t numOfBoundPKs = 0; FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) { SColumnNode* pCol = (SColumnNode*)pBoundCol; SExprNode* pExpr = (SExprNode*)pProj; @@ -5102,12 +5105,18 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns snprintf(pExpr->aliasName, sizeof(pExpr->aliasName), "%s", pCol->colName); if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { pPrimaryKeyExpr = (SNode*)pExpr; + numOfPKs = pCol->numOfPKs; } + if (pCol->isPk) ++numOfBoundPKs; } if (NULL == pPrimaryKeyExpr) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, - "Primary timestamp column can not be null"); + "Primary timestamp column should not be null"); + } + + if (numOfBoundPKs != numOfPKs) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Primary key column should not be none"); } return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b5236fee9e..100251b565 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -294,6 +294,7 @@ static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema, const STa pCol->colType = COLUMN_TYPE_COLUMN; pCol->isPk = pSchema->flags & COL_IS_KEY; pCol->tableHasPk = hasPkInTable(pMeta); + pCol->numOfPKs = pMeta->tableInfo.numOfPKs; strcpy(pCol->colName, pSchema->name); return (SNode*)pCol; } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index bf87803908..7948bbbceb 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -421,8 +421,16 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta * memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total); + bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY); for (int32_t i = 0; i < msg->numOfColumns; ++i) { pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; + if (hasPK && (i > 0)) { + if ((pTableMeta->schema[i].flags & COL_IS_KEY)) { + ++pTableMeta->tableInfo.numOfPKs; + } else { + hasPK = false; + } + } } qDebug("table %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s stb %s suid %" PRIx64 " sver %d tver %d" From dc608121833321c04f8a1f113601d3caa477bfb4 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 2 Apr 2024 15:04:13 +0800 Subject: [PATCH 11/16] chore: unify the prompt msg --- source/libs/parser/src/parInsertSql.c | 10 +++++----- source/libs/parser/src/parInsertStmt.c | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 94c0ca99e1..71d832dc1f 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -227,10 +227,10 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType)) { if (!pUseCols[0]) { - code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column should not be null"); + code = buildInvalidOperationMsg(&pCxt->msg, "Primary timestamp column should not be null"); } if (numOfBoundPKs != pTableMeta->tableInfo.numOfPKs) { - code = buildInvalidOperationMsg(&pCxt->msg, "primary key column should not be none"); + code = buildInvalidOperationMsg(&pCxt->msg, "Primary key column should not be none"); } } if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) { @@ -480,7 +480,7 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema, #if 0 if (isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - return buildSyntaxErrMsg(pMsgBuf, "primary timestamp column can not be null", pToken->z); + return buildSyntaxErrMsg(pMsgBuf, "Primary timestamp column can not be null", pToken->z); } return TSDB_CODE_SUCCESS; @@ -1653,10 +1653,10 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type); if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) { if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp column should not be null", pToken->z); + return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z); } if (pSchema->flags & COL_IS_KEY) { - return buildSyntaxErrMsg(&pCxt->msg, "primary key column should not be null", pToken->z); + return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z); } pVal->flag = CV_FLAG_NULL; diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index fb765dc3de..59c5ce82ad 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -268,7 +268,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in } if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){ - code = buildInvalidOperationMsg(&pBuf, "primary key column should not be null"); + code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); goto _return; } @@ -319,7 +319,7 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu } if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) { - code = buildInvalidOperationMsg(&pBuf, "primary key column should not be null"); + code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null"); goto _return; } From 5a546e37d71d7079c1e6d9a857331230cf93aaa6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 2 Apr 2024 16:10:42 +0800 Subject: [PATCH 12/16] fix(query): set correct forward step for twa query. --- source/libs/executor/src/timewindowoperator.c | 20 ++++++-- source/libs/function/src/builtinsimpl.c | 50 +++++++++---------- 2 files changed, 39 insertions(+), 31 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 51bfe716c8..f24b581ca2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -359,8 +359,8 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i } static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, - SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, - STimeWindow* win) { + int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols, + TSKEY blockEkey, STimeWindow* win) { int32_t order = pInfo->binfo.inputTsOrder; TSKEY actualEndKey = tsCols[endRowIndex]; @@ -378,7 +378,6 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx return true; } - int32_t nextRowIndex = endRowIndex + 1; ASSERT(nextRowIndex >= 0); TSKEY nextKey = tsCols[nextRowIndex]; @@ -517,9 +516,22 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP); if (!done) { int32_t endRowIndex = startPos + forwardRows - 1; + int32_t nextRowIndex = endRowIndex + 1; + + // duplicated ts row does not involve in the interpolation of end value for current time window + int32_t x = endRowIndex; + while(x >= 0) { + if (tsCols[x] == tsCols[x-1]) { + + x -= 1; + } else { + endRowIndex = x; + break; + } + } TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; - bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); + bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fafc313afc..36216d5b6f 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -533,6 +533,24 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) { } } +static void forwardToNextDiffTsRow(SFuncInputRowIter* pIter, int32_t rowIndex) { + int32_t idx = rowIndex + 1; + while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[rowIndex]) { + ++idx; + } + pIter->rowIndex = idx; +} + +static void setInputRowInfo(SFuncInputRow* pRow, SFuncInputRowIter* pIter, int32_t rowIndex, bool setPk) { + pRow->ts = pIter->tsList[rowIndex]; + pRow->ts = pIter->tsList[rowIndex]; + pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, rowIndex); + pRow->pData = colDataGetData(pIter->pDataCol, rowIndex); + pRow->pPk = setPk? colDataGetData(pIter->pPkCol, rowIndex):NULL; + pRow->block = pIter->pSrcBlock; + pRow->rowIndex = rowIndex; +} + bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { if (pIter->hasPrev) { if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) { @@ -543,33 +561,19 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { while (pIter->tsList[idx] == pIter->prevBlockTsEnd) { ++idx; } - pRow->ts = pIter->tsList[idx]; - pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx); - pRow->pData = colDataGetData(pIter->pDataCol, idx); - pRow->pPk = colDataGetData(pIter->pPkCol, idx); - pRow->block = pIter->pSrcBlock; - pRow->rowIndex = idx; pIter->hasPrev = false; - pIter->rowIndex = idx + 1; + setInputRowInfo(pRow, pIter, idx, true); + forwardToNextDiffTsRow(pIter, idx); return true; } } else { if (pIter->rowIndex <= pIter->inputEndIndex) { - pRow->ts = pIter->tsList[pIter->rowIndex]; - pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex); - pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex); - pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex); - pRow->block = pIter->pSrcBlock; - pRow->rowIndex = pIter->rowIndex; + setInputRowInfo(pRow, pIter, pIter->rowIndex, true); TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex]; if (pIter->tsList[pIter->rowIndex] != tsEnd) { - int32_t idx = pIter->rowIndex + 1; - while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) { - ++idx; - } - pIter->rowIndex = idx; + forwardToNextDiffTsRow(pIter, pIter->rowIndex); } else { pIter->rowIndex = pIter->inputEndIndex + 1; } @@ -585,13 +589,7 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) { if (pIter->rowIndex <= pIter->inputEndIndex) { - pRow->ts = pIter->tsList[pIter->rowIndex]; - pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex); - pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex); - pRow->pPk = NULL; - pRow->block = pIter->pSrcBlock; - pRow->rowIndex = pIter->rowIndex; - + setInputRowInfo(pRow, pIter, pIter->rowIndex, false); ++pIter->rowIndex; return true; } else { @@ -5580,8 +5578,6 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); From adb6de0595d1c59cd9c01cc92523748a45453945 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 2 Apr 2024 17:33:11 +0800 Subject: [PATCH 13/16] fix: null value check for target column with pk --- include/util/taoserror.h | 1 + source/libs/executor/src/dataInserter.c | 12 +++++++++++- source/util/src/terror.c | 1 + 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index affa1f0345..3dc6e5333d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -766,6 +766,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_SECOND_COL_PK TAOS_DEF_ERROR_CODE(0, 0x2672) #define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673) #define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674) +#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 88fb60fc4c..2d481bd273 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -217,6 +217,11 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { + if ((pCol->flags & COL_IS_KEY)) { + qError("NULL value for primary key, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); + terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + goto _end; + } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); } else { @@ -240,10 +245,15 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (colDataIsNull_s(pColInfoData, j)) { if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { - qError("NULL value for primary key"); + qError("NULL value for primary timestamp key"); terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; goto _end; } + if ((pCol->flags & COL_IS_KEY)) { + qError("NULL value for primary key, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); + terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; + goto _end; + } SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type taosArrayPush(pVals, &cv); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a64c8642db..99666320f1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -628,6 +628,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY, "tag can not be prim TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be second column") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error") //planner From 6e56e61ad52a56b610df1f4bcc4c6cf8dad59110 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 2 Apr 2024 19:49:30 +0800 Subject: [PATCH 14/16] fix: target table has less primary keys than source --- source/libs/parser/src/parTranslater.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 830b7f7396..b06a685c48 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5088,7 +5088,8 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns SNode* pPrimaryKeyExpr = NULL; SNode* pBoundCol = NULL; SNode* pProj = NULL; - int16_t numOfPKs = 0; + int16_t numOfSourcePKs = 0; + int16_t numOfTargetPKs = 0; int16_t numOfBoundPKs = 0; FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) { SColumnNode* pCol = (SColumnNode*)pBoundCol; @@ -5105,7 +5106,8 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns snprintf(pExpr->aliasName, sizeof(pExpr->aliasName), "%s", pCol->colName); if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { pPrimaryKeyExpr = (SNode*)pExpr; - numOfPKs = pCol->numOfPKs; + numOfTargetPKs = pCol->numOfPKs; + numOfSourcePKs = ((SColumnNode*)pProj)->numOfPKs; } if (pCol->isPk) ++numOfBoundPKs; } @@ -5115,10 +5117,16 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns "Primary timestamp column should not be null"); } - if (numOfBoundPKs != numOfPKs) { + if (numOfBoundPKs != numOfTargetPKs) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Primary key column should not be none"); } + if (numOfTargetPKs < numOfSourcePKs) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, + "Target table has less primary keys:%" PRIi16 " than source:%" PRIi16, + numOfTargetPKs, numOfSourcePKs); + } + return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); } From cc660de9d0ec8160c69b5e1193c7798c39ec3371 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 3 Apr 2024 08:14:30 +0800 Subject: [PATCH 15/16] chore: error prompt for null primary key column --- source/libs/executor/src/dataInserter.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 2d481bd273..dfe30c9f96 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -218,7 +218,7 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp ASSERT(pColInfoData->info.type == pCol->type); if (colDataIsNull_s(pColInfoData, j)) { if ((pCol->flags & COL_IS_KEY)) { - qError("NULL value for primary key, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); + qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; goto _end; } @@ -245,12 +245,12 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (colDataIsNull_s(pColInfoData, j)) { if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { - qError("NULL value for primary timestamp key"); + qError("Primary timestamp column should not be null"); terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; goto _end; } if ((pCol->flags & COL_IS_KEY)) { - qError("NULL value for primary key, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); + qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type); terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL; goto _end; } From 62ab1b1c41a5e2436630ba29f0bfde6ca0a3845f Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 3 Apr 2024 09:39:58 +0800 Subject: [PATCH 16/16] chore: allow source table with primary key --- source/libs/parser/src/parTranslater.c | 8 -------- 1 file changed, 8 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b06a685c48..a45e4de4d8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5088,7 +5088,6 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns SNode* pPrimaryKeyExpr = NULL; SNode* pBoundCol = NULL; SNode* pProj = NULL; - int16_t numOfSourcePKs = 0; int16_t numOfTargetPKs = 0; int16_t numOfBoundPKs = 0; FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) { @@ -5107,7 +5106,6 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { pPrimaryKeyExpr = (SNode*)pExpr; numOfTargetPKs = pCol->numOfPKs; - numOfSourcePKs = ((SColumnNode*)pProj)->numOfPKs; } if (pCol->isPk) ++numOfBoundPKs; } @@ -5121,12 +5119,6 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Primary key column should not be none"); } - if (numOfTargetPKs < numOfSourcePKs) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, - "Target table has less primary keys:%" PRIi16 " than source:%" PRIi16, - numOfTargetPKs, numOfSourcePKs); - } - return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery); }