diff --git a/docs/en/12-taos-sql/05-insert.md b/docs/en/12-taos-sql/05-insert.md index 462e7fc0ae..32227a2214 100644 --- a/docs/en/12-taos-sql/05-insert.md +++ b/docs/en/12-taos-sql/05-insert.md @@ -1,7 +1,7 @@ --- title: Insert sidebar_label: Insert -description: This document describes how to insert data into TDengine. +description: This document describes the SQL commands and syntax for inserting data into TDengine. --- ## Syntax diff --git a/docs/en/12-taos-sql/13-tmq.md b/docs/en/12-taos-sql/13-tmq.md index d14b6da2d3..16dc9efd62 100644 --- a/docs/en/12-taos-sql/13-tmq.md +++ b/docs/en/12-taos-sql/13-tmq.md @@ -1,5 +1,5 @@ --- -title: Data Subscription +title: Data Subscription SQL Reference sidebar_label: Data Subscription description: This document describes the SQL statements related to the data subscription component of TDengine. --- diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index e1bf18c854..337660a36c 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -1,5 +1,5 @@ --- -title: Stream Processing +title: Stream Processing SQL Reference sidebar_label: Stream Processing description: This document describes the SQL statements related to the stream processing component of TDengine. --- @@ -148,7 +148,7 @@ T = latest event time - watermark The window closing time for each batch of data that arrives at the system is updated using the preceding formula, and all windows are closed whose closing time is less than T. If the triggering method is WINDOW_CLOSE or MAX_DELAY, the aggregate result for the window is pushed. -Stream processing strategy for expired data +## Stream processing strategy for expired data The data in expired windows is tagged as expired. TDengine stream processing provides two methods for handling such data: 1. Drop the data. This is the default and often only handling method for most stream processing engines. @@ -157,6 +157,14 @@ The data in expired windows is tagged as expired. TDengine stream processing pro In both of these methods, configuring the watermark is essential for obtaining accurate results (if expired data is dropped) and avoiding repeated triggers that affect system performance (if expired data is recalculated). +## Stream processing strategy for modifying data + +TDengine provides two ways to handle modified data, which are specified by the IGNORE UPDATE option: + +1. Check whether the data has been modified, i.e. IGNORE UPDATE 0, and recalculate the corresponding window if the data has been modified. + +2. Do not check whether the data has been modified, and calculate all the data as incremental data, i.e. IGNORE UPDATE 1, the default configuration. + ## Supported functions All [scalar functions](../function/#scalar-functions) are available in stream processing. All [Aggregate functions](../function/#aggregate-functions) and [Selection functions](../function/#selection-functions) are available in stream processing, except the followings: diff --git a/docs/en/12-taos-sql/26-udf.md b/docs/en/12-taos-sql/26-udf.md index f86b535927..dec9ca217d 100644 --- a/docs/en/12-taos-sql/26-udf.md +++ b/docs/en/12-taos-sql/26-udf.md @@ -1,5 +1,5 @@ --- -title: User-Defined Functions (UDF) +title: User-Defined Functions (UDF) SQL Reference sidebar_label: User-Defined Functions description: This document describes the SQL statements related to user-defined functions (UDF) in TDengine. --- diff --git a/docs/en/14-reference/07-tdinsight/index.md b/docs/en/14-reference/07-tdinsight/index.md index cada05d738..1bc983262e 100644 --- a/docs/en/14-reference/07-tdinsight/index.md +++ b/docs/en/14-reference/07-tdinsight/index.md @@ -1,5 +1,5 @@ --- -title: TDinsight - Grafana-based Zero-Dependency Monitoring Solution for TDengine +title: TDinsight sidebar_label: TDinsight description: This document describes TDinsight, a monitoring solution for TDengine. --- diff --git a/docs/en/25-application/01-telegraf.md b/docs/en/25-application/01-telegraf.md index a6db826fa3..f8784e9ab9 100644 --- a/docs/en/25-application/01-telegraf.md +++ b/docs/en/25-application/01-telegraf.md @@ -1,5 +1,5 @@ --- -title: Quickly Build IT DevOps Visualization System with TDengine + Telegraf + Grafana +title: IT Visualization with TDengine + Telegraf + Grafana sidebar_label: TDengine + Telegraf + Grafana description: This document describes how to create an IT visualization system by integrating TDengine with Telegraf and Grafana. --- diff --git a/docs/examples/rust/nativeexample/examples/query.rs b/docs/examples/rust/nativeexample/examples/query.rs new file mode 100644 index 0000000000..dfe55e8749 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/query.rs @@ -0,0 +1,66 @@ +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let dsn = "taos://localhost:6030"; + let builder = TaosBuilder::from_dsn(dsn)?; + + let taos = builder.build()?; + + // ANCHOR: create_db_and_table + let db = "power"; + // create database + taos.exec_many([ + format!("DROP DATABASE IF EXISTS `{db}`"), + format!("CREATE DATABASE `{db}`"), + format!("USE `{db}`"), + ]) + .await?; + + // create table + taos.exec_many([ + // create super table + "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \ + TAGS (`groupid` INT, `location` BINARY(24))", + ]).await?; + // ANCHOR_END: create_db_and_table + + // ANCHOR: insert_data + let inserted = taos.exec("INSERT INTO " + + "power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 219, 0.31000) " + + "(NOW + 2a, 12.60000, 218, 0.33000) " + + "(NOW + 3a, 12.30000, 221, 0.31000) " + + "power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " + + "VALUES " + + "(NOW + 1a, 10.30000, 218, 0.25000) ").await?; + + println!("inserted: {} rows", inserted); + // ANCHOR_END: insert_data + + // ANCHOR: query_data + let mut result = taos.query("SELECT * FROM power.meters").await?; + + for field in result.fields() { + println!("got field: {}", field.name()); + } + + let mut rows = result.rows(); + let mut nrows = 0; + while let Some(row) = rows.try_next().await? { + for (col, (name, value)) in row.enumerate() { + println!( + "[{}] got value in col {} (named `{:>8}`): {}", + nrows, col, name, value + ); + } + nrows += 1; + } + // ANCHOR_END: query_data + + // ANCHOR: query_with_req_id + let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?; + // ANCHOR_END: query_with_req_id + +} diff --git a/docs/examples/rust/nativeexample/examples/schemaless.rs b/docs/examples/rust/nativeexample/examples/schemaless.rs new file mode 100644 index 0000000000..44ce0fe694 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/schemaless.rs @@ -0,0 +1,80 @@ +use taos_query::common::SchemalessPrecision; +use taos_query::common::SchemalessProtocol; +use taos_query::common::SmlDataBuilder; + +use crate::AsyncQueryable; +use crate::AsyncTBuilder; +use crate::TaosBuilder; + +async fn put() -> anyhow::Result<()> { + std::env::set_var("RUST_LOG", "taos=debug"); + pretty_env_logger::init(); + let dsn = + std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string()); + log::debug!("dsn: {:?}", &dsn); + + let client = TaosBuilder::from_dsn(dsn)?.build().await?; + + let db = "power"; + + client.exec(format!("drop database if exists {db}")).await?; + + client + .exec(format!("create database if not exists {db}")) + .await?; + + // should specify database before insert + client.exec(format!("use {db}")).await?; + + // SchemalessProtocol::Line + let data = [ + "meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000", + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Line) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(100u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + // SchemalessProtocol::Telnet + let data = [ + "meters.current 1648432611249 10.3 location=California.SanFrancisco group=2", + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Telnet) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(200u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + // SchemalessProtocol::Json + let data = [ + r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"# + ] + .map(String::from) + .to_vec(); + + let sml_data = SmlDataBuilder::default() + .protocol(SchemalessProtocol::Json) + .precision(SchemalessPrecision::Millisecond) + .data(data.clone()) + .ttl(1000) + .req_id(300u64) + .build()?; + assert_eq!(client.put(&sml_data).await?, ()); + + client.exec(format!("drop database if exists {db}")).await?; + + Ok(()) +} diff --git a/docs/examples/rust/nativeexample/examples/stmt.rs b/docs/examples/rust/nativeexample/examples/stmt.rs new file mode 100644 index 0000000000..0194eccdf1 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/stmt.rs @@ -0,0 +1,37 @@ +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let taos = TaosBuilder::from_dsn("taos://")?.build().await?; + + taos.exec("DROP DATABASE IF EXISTS power").await?; + taos.create_database("power").await?; + taos.use_database("power").await?; + taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?; + + let mut stmt = Stmt::init(&taos).await?; + stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?; + + const NUM_TABLES: usize = 10; + const NUM_ROWS: usize = 10; + for i in 0..NUM_TABLES { + let table_name = format!("d{}", i); + let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)]; + stmt.set_tbname_tags(&table_name, &tags).await?; + for j in 0..NUM_ROWS { + let values = vec![ + ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]), + ColumnView::from_floats(vec![10.3 + j as f32]), + ColumnView::from_ints(vec![219 + j as i32]), + ColumnView::from_floats(vec![0.31 + j as f32]), + ]; + stmt.bind(&values).await?; + } + stmt.add_batch().await?; + } + + // execute. + let rows = stmt.execute().await?; + assert_eq!(rows, NUM_TABLES * NUM_ROWS); + Ok(()) +} diff --git a/docs/examples/rust/nativeexample/examples/tmq.rs b/docs/examples/rust/nativeexample/examples/tmq.rs new file mode 100644 index 0000000000..764c0c1fc8 --- /dev/null +++ b/docs/examples/rust/nativeexample/examples/tmq.rs @@ -0,0 +1,166 @@ +use std::time::Duration; +use std::str::FromStr; + +use taos::*; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + pretty_env_logger::formatted_timed_builder() + .filter_level(log::LevelFilter::Info) + .init(); + use taos_query::prelude::*; + let dsn = "taos://localhost:6030".to_string(); + log::info!("dsn: {}", dsn); + let mut dsn = Dsn::from_str(&dsn)?; + + let taos = TaosBuilder::from_dsn(&dsn)?.build().await?; + + // prepare database and table + taos.exec_many([ + "drop topic if exists topic_meters", + "drop database if exists power", + "create database if not exists power WAL_RETENTION_PERIOD 86400", + "use power", + + "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))", + + "create table if not exists power.d001 using power.meters tags(1,'location')", + + ]) + .await?; + + taos.exec_many([ + "drop database if exists db2", + "create database if not exists db2 wal_retention_period 3600", + "use db2", + ]) + .await?; + + // ANCHOR: create_topic + taos.exec_many([ + "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters", + ]) + .await?; + // ANCHOR_END: create_topic + + // ANCHOR: create_consumer + dsn.params.insert("group.id".to_string(), "abc".to_string()); + dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string()); + + let builder = TmqBuilder::from_dsn(&dsn)?; + let mut consumer = builder.build().await?; + // ANCHOR_END: create_consumer + + // ANCHOR: subscribe + consumer.subscribe(["topic_meters"]).await?; + // ANCHOR_END: subscribe + + // ANCHOR: consume + { + let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1)); + + while let Some((offset, message)) = stream.try_next().await? { + + let topic: &str = offset.topic(); + let database = offset.database(); + let vgroup_id = offset.vgroup_id(); + log::debug!( + "topic: {}, database: {}, vgroup_id: {}", + topic, + database, + vgroup_id + ); + + match message { + MessageSet::Meta(meta) => { + log::info!("Meta"); + let raw = meta.as_raw_meta().await?; + taos.write_raw_meta(&raw).await?; + + let json = meta.as_json_meta().await?; + let sql = json.to_string(); + if let Err(err) = taos.exec(sql).await { + println!("maybe error: {}", err); + } + } + MessageSet::Data(data) => { + log::info!("Data"); + while let Some(data) = data.fetch_raw_block().await? { + log::debug!("data: {:?}", data); + } + } + MessageSet::MetaData(meta, data) => { + log::info!("MetaData"); + let raw = meta.as_raw_meta().await?; + taos.write_raw_meta(&raw).await?; + + let json = meta.as_json_meta().await?; + let sql = json.to_string(); + if let Err(err) = taos.exec(sql).await { + println!("maybe error: {}", err); + } + + while let Some(data) = data.fetch_raw_block().await? { + log::debug!("data: {:?}", data); + } + } + } + consumer.commit(offset).await?; + } + } + // ANCHOR_END: consume + + // ANCHOR: assignments + let assignments = consumer.assignments().await.unwrap(); + log::info!("assignments: {:?}", assignments); + // ANCHOR_END: assignments + + // seek offset + for topic_vec_assignment in assignments { + let topic = &topic_vec_assignment.0; + let vec_assignment = topic_vec_assignment.1; + for assignment in vec_assignment { + let vgroup_id = assignment.vgroup_id(); + let current = assignment.current_offset(); + let begin = assignment.begin(); + let end = assignment.end(); + log::debug!( + "topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}", + topic, + vgroup_id, + current, + begin, + end + ); + // ANCHOR: seek_offset + let res = consumer.offset_seek(topic, vgroup_id, end).await; + if res.is_err() { + log::error!("seek offset error: {:?}", res); + let a = consumer.assignments().await.unwrap(); + log::error!("assignments: {:?}", a); + } + // ANCHOR_END: seek_offset + } + + let topic_assignment = consumer.topic_assignment(topic).await; + log::debug!("topic assignment: {:?}", topic_assignment); + } + + // after seek offset + let assignments = consumer.assignments().await.unwrap(); + log::info!("after seek offset assignments: {:?}", assignments); + + // ANCHOR: unsubscribe + consumer.unsubscribe().await; + // ANCHOR_END: unsubscribe + + tokio::time::sleep(Duration::from_secs(1)).await; + + taos.exec_many([ + "drop database db2", + "drop topic topic_meters", + "drop database power", + ]) + .await?; + Ok(()) +} diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 2ed3c9afae..125f868758 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -201,9 +201,9 @@ TDengine 对于过期数据提供两种处理方式,由 IGNORE EXPIRED 选项 TDengine 对于修改数据提供两种处理方式,由 IGNORE UPDATE 选项指定: -1. 检查数据是否被修改,即 IGNORE UPDATE 0:默认配置,如果被修改,则重新计算对应窗口。 +1. 检查数据是否被修改,即 IGNORE UPDATE 0,如果数据被修改,则重新计算对应窗口。 -2. 不检查数据是否被修改,全部按增量数据计算,即 IGNORE UPDATE 1。 +2. 不检查数据是否被修改,全部按增量数据计算,即 IGNORE UPDATE 1,默认配置。 ## 写入已存在的超级表 diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index fa6e4c3ae8..cf6d4ba2b0 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName); bool isAutoTableName(char* ctbName); -void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId); +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5f3761d7b7..138fad0ddb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; -#define SSTREAM_TASK_VER 3 +#define SSTREAM_TASK_VER 4 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 @@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data int8_t dispatchMsgType; + int64_t checkpointId;// checkpoint id msg + int32_t transId; // transId for current checkpoint int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c8fc556150..ce149921e3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1626,6 +1626,22 @@ void changeByteEndian(char* pData){ } } +static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, int64_t *rows, int32_t *precision){ + if(*(int64_t*)pRetrieve == 0){ + *rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); + if(precision != NULL){ + *precision = ((SRetrieveTableRsp*)pRetrieve)->precision; + } + }else if(*(int64_t*)pRetrieve == 1){ + *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); + if(precision != NULL){ + *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision; + } + } +} + static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); @@ -1648,13 +1664,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg void* rawData = NULL; int64_t rows = 0; // deal with compatibility - if(*(int64_t*)pRetrieve == 0){ - rawData = ((SRetrieveTableRsp*)pRetrieve)->data; - rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); - }else if(*(int64_t*)pRetrieve == 1){ - rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; - rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); - } + tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL); pVg->numOfRows += rows; (*numOfRows) += rows; @@ -2625,18 +2635,22 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { pRspObj->resIter++; if (pRspObj->resIter < pRspObj->rsp.blockNum) { - SRetrieveTableRspForTmq* pRetrieveTmq = - (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); if (pRspObj->rsp.withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); } - pRspObj->resInfo.pData = (void*)pRetrieveTmq->data; - pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows); + void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + void* rawData = NULL; + int64_t rows = 0; + int32_t precision = 0; + tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision); + + pRspObj->resInfo.pData = rawData; + pRspObj->resInfo.numOfRows = rows; pRspObj->resInfo.current = 0; - pRspObj->resInfo.precision = pRetrieveTmq->precision; + pRspObj->resInfo.precision = precision; // TODO handle the compressed case pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f4455be206..dc1c27b123 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2141,10 +2141,14 @@ _end: return TSDB_CODE_SUCCESS; } -void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){ +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){ char tmp[TSDB_TABLE_NAME_LEN] = {0}; - snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); - ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end + if (stbName == NULL){ + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); + }else{ + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId); + } + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end strcat(ctbName, tmp); } @@ -2154,6 +2158,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0 bool alreadyAddGroupId(char* ctbName) { size_t len = strlen(ctbName); + if (len == 0) return false; size_t _location = len - 1; while (_location > 0) { if (ctbName[_location] < '0' || ctbName[_location] > '9') { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c543f3702b..2ad8245a5a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -98,7 +98,7 @@ int64_t tsDndUpTime = 0; // dnode misc uint32_t tsEncryptionKeyChksum = 0; -int8_t tsEncryptionKeyStat = ENCRYPT_KEY_STAT_UNKNOWN; +int8_t tsEncryptionKeyStat = ENCRYPT_KEY_STAT_UNSET; int8_t tsGrant = 1; // monitor diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 684742fe53..a7c64e2fb9 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -114,9 +114,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { req.clusterCfg.checkTime = 0; req.clusterCfg.ttlChangeOnWrite = tsTtlChangeOnWrite; req.clusterCfg.enableWhiteList = tsEnableWhiteList ? 1 : 0; - req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat; // ENCRYPT_TODO - req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum; // ENCRYPT_TODO - // pMgmt->pData->dnodeId == 1 ? 0 : pMgmt->pData->dnodeId + 10; // tsEncryptionKeyChksum; // ENCRYPT_TODO + req.clusterCfg.encryptionKeyStat = tsEncryptionKeyStat; + req.clusterCfg.encryptionKeyChksum = tsEncryptionKeyChksum; char timestr[32] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1eae57d158..655a0d7edb 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -161,6 +161,7 @@ typedef struct { ETrnConflct conflict; ETrnExec exec; EOperType oper; + bool changeless; int32_t code; int32_t failedTimes; void* rpcRsp; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 8689df98af..8c9ca87fb1 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -81,6 +81,7 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId); void mndTransSetSerial(STrans *pTrans); void mndTransSetParallel(STrans *pTrans); +void mndTransSetChangeless(STrans *pTrans); void mndTransSetOper(STrans *pTrans, EOperType oper); int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans); #ifndef BUILD_NO_CALL diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 52671f6b66..ed9333f480 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -292,16 +292,16 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){ int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp); if (tlen <= 0){ - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_TMQ_INVALID_MSG; } void *buf = rpcMallocCont(tlen); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){ + if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){ rpcFreeCont(buf); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_TMQ_INVALID_MSG; } pMsg->info.rsp = buf; pMsg->info.rspLen = tlen; @@ -316,7 +316,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = NULL; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 3f69c7def3..091edc6ab0 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { SStreamTask *pTask = taosArrayGetP(pArray, j); - pTask->ver = SSTREAM_TASK_VER; + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; } } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 5a2525057e..33a54b13c7 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -1634,7 +1634,7 @@ static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp) { const STraceId *trace = &pRsp->info.traceId; mGInfo("msg:%p, create encrypt key rsp, nReq:%" PRIi16 ", nSucess:%" PRIi16 ", nFailed:%" PRIi16 ", %s", pRsp, nReq, - nSuccess, nFailed, finished ? "encrypt done" : "in encrypting") return 0; + nSuccess, nFailed, finished ? "encrypt done" : "in encrypting"); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6067af199e..ff05db417e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { SEncoder encoder; tEncoderInit(&encoder, NULL, 0); - pTask->ver = SSTREAM_TASK_VER; + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } tEncodeStreamTask(&encoder, pTask); int32_t size = encoder.pos; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3af372a432..0e4f4210fb 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -739,6 +739,8 @@ void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; } +void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; } + void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { @@ -862,7 +864,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - if (taosArrayGetSize(pTrans->commitActions) <= 0) { + if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) { terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 67ad2d5067..1b66c13ba2 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2343,24 +2343,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra return -1; } - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) { - mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId); - return -1; - } - mndReleaseDb(pMnode, pDb); - - SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup); - if (pRaw == NULL) { - mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId); - return -1; - } - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { - sdbFreeRaw(pRaw); - mError("trans:%d, vgid:%d failed to append commit log dnode:%d", pTrans->id, vgid, dnodeId); - return -1; - } - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); } else { mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online); diff --git a/source/dnode/vnode/src/meta/metaSnapshot.c b/source/dnode/vnode/src/meta/metaSnapshot.c index e86ed3b657..5850e794fa 100644 --- a/source/dnode/vnode/src/meta/metaSnapshot.c +++ b/source/dnode/vnode/src/meta/metaSnapshot.c @@ -79,6 +79,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { int32_t nKey = 0; int32_t nData = 0; STbDbKey key; + SMetaInfo info; *ppData = NULL; for (;;) { @@ -91,7 +92,8 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) { goto _exit; } - if (key.version < pReader->sver) { + if (key.version < pReader->sver // + || metaGetInfo(pReader->pMeta, key.uid, &info, NULL) == TSDB_CODE_NOT_FOUND) { tdbTbcMoveToNext(pReader->pTbc); continue; } diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5374b9aa78..1e0ae7a854 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); - if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { - buildCtbNameAddGroupId(name, groupId); + if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) { + buildCtbNameAddGroupId(stbFullName, name, groupId); } } else if (stbFullName) { name = buildCtbNameByGroupId(stbFullName, groupId); @@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { + !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); - buildCtbNameAddGroupId(pCreateTableReq->name, gid); + buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid); // tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); @@ -671,10 +671,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); } else { - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && - !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { + if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && + !alreadyAddGroupId(dstTableName) && groupId != 0) { tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); - buildCtbNameAddGroupId(dstTableName, groupId); + if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGroupId(NULL, dstTableName, groupId); + }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { + buildCtbNameAddGroupId(stbFullName, dstTableName, groupId); + } } } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1b67dce9b0..0f7f74f78b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); + taosThreadMutexLock(&pTask->lock); + // clear flag set during do checkpoint, and open inputQ for all upstream tasks if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", + pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); } + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9a480359ed..add2955a2b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -45,8 +45,8 @@ static FilterCondType checkTagCond(SNode* cond); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SStorageAPI* pAPI); static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* pTagCond, SStorageAPI* pStoreAPI); -static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, - STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); +static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, + STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI); static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } @@ -642,7 +642,8 @@ int32_t getColInfoResultForGroupby(void* pVnode, SNodeList* group, STableListInf info->groupId = calcGroupId(keyBuf, len); if (initRemainGroups) { // groupId ~ table uid - taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), sizeof(info->uid)); + taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid), + sizeof(info->uid)); } } @@ -858,7 +859,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S } SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, - SStorageAPI* pStorageAPI) { + SStorageAPI* pStorageAPI) { SSDataBlock* pResBlock = createDataBlock(); if (pResBlock == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -939,11 +940,12 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S return pResBlock; } -static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) { +static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, + bool* pResultList, bool addUid) { taosArrayClear(pUidList); STableKeyInfo info = {.uid = 0, .groupId = 0}; - int32_t numOfTables = taosArrayGetSize(pUidTagList); + int32_t numOfTables = taosArrayGetSize(pUidTagList); for (int32_t i = 0; i < numOfTables; ++i) { if (pResultList[i]) { uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid; @@ -1143,7 +1145,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid); } else { - qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); + qDebug("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList)); } } } @@ -1165,7 +1167,8 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S memcpy(pPayload + sizeof(int32_t), taosArrayGet(pUidList, 0), numOfTables * sizeof(uint64_t)); } - pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), pPayload, size, 1); + pStorageAPI->metaFn.putCachedTableList(pVnode, pScanNode->suid, context.digest, tListLen(context.digest), + pPayload, size, 1); digest[0] = 1; memcpy(digest + 1, context.digest, tListLen(context.digest)); } @@ -1725,7 +1728,8 @@ SColumn extractColumnFromColumnNode(SColumnNode* pColNode) { return c; } -int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle) { +int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, + const SReadHandle* readHandle) { pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pCond->numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); @@ -1748,8 +1752,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi // allowed read stt file optimization mode pCond->notLoadData = (pTableScanNode->dataRequired == FUNC_DATA_REQUIRED_NOT_LOAD) && - (pTableScanNode->scan.node.pConditions == NULL) && - (pTableScanNode->interval == 0); + (pTableScanNode->scan.node.pConditions == NULL) && (pTableScanNode->interval == 0); int32_t j = 0; for (int32_t i = 0; i < pCond->numOfCols; ++i) { @@ -1891,7 +1894,8 @@ void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t orde int32_t factor = GET_FORWARD_DIRECTION_FACTOR(order); slidingStart = taosTimeAdd(slidingStart, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision); tw->skey = taosTimeAdd(slidingStart, pInterval->offset, pInterval->offsetUnit, pInterval->precision); - int64_t slidingEnd = taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; + int64_t slidingEnd = + taosTimeAdd(slidingStart, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; tw->ekey = taosTimeAdd(slidingEnd, pInterval->offset, pInterval->offsetUnit, pInterval->precision); } @@ -2136,7 +2140,7 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* if (groupSort && groupByTbname) { taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); pTableListInfo->numOfOuputGroups = numOfTables; - } else if (groupByTbname && pScanNode->groupOrderScan){ + } else if (groupByTbname && pScanNode->groupOrderScan) { pTableListInfo->numOfOuputGroups = numOfTables; } else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) { pTableListInfo->numOfOuputGroups = numOfTables; @@ -2147,7 +2151,8 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* bool initRemainGroups = false; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode)) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pScanNode; - if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && !(groupSort || pScanNode->groupOrderScan)) { + if (tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable && + !(groupSort || pScanNode->groupOrderScan)) { initRemainGroups = true; } } @@ -2271,7 +2276,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr } if (qDebugFlag & DEBUG_DEBUG) { char* pBuf = NULL; - char flagBuf[64]; + char flagBuf[64]; snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr)); taosMemoryFree(pBuf); @@ -2280,7 +2285,7 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } -void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { +void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { int64_t* ts = (int64_t*)pColData->pData; int64_t duration = pWin->ekey - pWin->skey + delta; @@ -2289,13 +2294,14 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t ts[4] = pWin->ekey + delta; // window end key } -int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) { +int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, + int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; const char* isNull = oldkeyBuf; const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size; for (int32_t i = 0; i < pSortGroupCols->size; ++i) { - const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); + const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i); const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId); if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId]; @@ -2321,8 +2327,7 @@ int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t ol return 0; } -int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, - int32_t rowIndex) { +int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex) { uint32_t colNum = pSortGroupCols->size; SColumnDataAgg* pColAgg = NULL; char* isNull = keyBuf; @@ -2370,7 +2375,7 @@ uint64_t calcGroupId(char* pData, int32_t len) { } SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) { - SNode* node; + SNode* node; SNodeList* ret = NULL; FOREACH(node, pSortKeys) { SOrderByExprNode* pSortKey = (SOrderByExprNode*)node; @@ -2386,6 +2391,6 @@ int32_t extractKeysLen(const SArray* keys) { SColumn* pCol = (SColumn*)taosArrayGet(keys, i); len += pCol->bytes; } - len += sizeof(int8_t) * keyNum; //null flag + len += sizeof(int8_t) * keyNum; // null flag return len; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 628aacf3c3..903e2f85aa 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -983,7 +983,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, : (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE); tagVarChar = taosMemoryCalloc(1, bufSize + 1); int32_t len = -1; - convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len); + if (tagLen > 0) + convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len); + else + len = 0; varDataSetLen(tagVarChar, len); } } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index aa33d6f2fc..079fd7d29d 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -486,6 +486,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); + rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -526,6 +527,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { if (code) { qError("hb rsp error:%s", tstrerror(code)); + rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); SCH_ERR_JRET(code); } @@ -1181,7 +1183,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, qMsg.queryId = pJob->queryId; qMsg.taskId = pTask->taskId; qMsg.refId = pJob->refId; - qMsg.execId = pTask->execId; + qMsg.execId = *(int32_t*)param; msgSize = tSerializeSTaskDropReq(NULL, 0, &qMsg); if (msgSize < 0) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index d96c01fc76..97c3c7d276 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -371,14 +371,13 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, pCtx->roundTotal = pEpSet->numOfEps; } - if (pCtx->roundTimes >= pCtx->roundTotal) { int64_t nowTs = taosGetTimestampMs(); int64_t lastTime = nowTs - pCtx->startTs; if (lastTime > tsMaxRetryWaitTime) { SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - pJob->noMoreRetry = true; + pJob->noMoreRetry = true; SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode)); } @@ -418,7 +417,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; - pTask->childReady = 0; + pTask->childReady = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } @@ -505,11 +504,11 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i pLevel->taskExecDoneNum = 0; pLevel->taskLaunchedNum = 0; } - + SCH_RESET_JOB_LEVEL_IDX(pJob); - + code = schDoTaskRedirect(pJob, pTask, pData, rspCode); - + taosMemoryFreeClear(pData->pData); taosMemoryFreeClear(pData->pEpSet); @@ -627,7 +626,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo pTask->maxRetryTimes); return TSDB_CODE_SUCCESS; } - + if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) { pTask->maxExecTimes++; pTask->maxRetryTimes++; @@ -862,7 +861,9 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { while (nodeInfo) { if (nodeInfo->handle) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, NULL); + void *pExecId = taosHashGetKey(nodeInfo, NULL); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); + SCH_TASK_DLOG("start to drop task's %dth execNode", i); } else { SCH_TASK_DLOG("no need to drop task %dth execNode", i); @@ -901,7 +902,6 @@ int32_t schNotifyTaskOnExecNode(SSchJob *pJob, SSchTask *pTask, ETaskNotifyType return TSDB_CODE_SUCCESS; } - int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { int32_t taskNum = (int32_t)taosArrayGetSize(pStatusList); SSchTask *pTask = NULL; @@ -1269,7 +1269,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t int32_t code = TSDB_CODE_SUCCESS; SCH_ERR_RET(schNotifyTaskOnExecNode(pJob, pCurrTask, type)); - + void *pIter = taosHashIterate(list, NULL); while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; @@ -1277,7 +1277,7 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_LOCK_TASK(pTask); code = schNotifyTaskOnExecNode(pJob, pTask, type); SCH_UNLOCK_TASK(pTask); - + if (TSDB_CODE_SUCCESS != code) { break; } @@ -1289,7 +1289,6 @@ int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType t SCH_RET(code); } - int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) { SCH_RET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask), NULL)); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f58c72eded..7f52c5d2f0 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -278,6 +278,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.numOfNotReady = 0; pTask->chkInfo.transId = 0; pTask->chkInfo.dispatchCheckpointTrigger = false; + pTask->chkInfo.downstreamAlignNum = 0; streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks if (clearChkpReadyMsg) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0af664f1e1..baf5ebf8cb 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); } + pMsgInfo->checkpointId = -1; + pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; } @@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.dispatchMsgType = pData->type; + if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SSDataBlock* p = taosArrayGet(pData->blocks, 0); + pTask->msgInfo.checkpointId = p->info.version; + pTask->msgInfo.transId = p->info.window.ekey; + } + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); @@ -580,12 +588,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && - pTask->subtableWithoutMd5 != 1 && + if(pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) && groupId != 0){ - buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId); + if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); + }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId); + } } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); @@ -947,9 +958,21 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { - pTask->chkInfo.dispatchCheckpointTrigger = true; + taosThreadMutexLock(&pTask->lock); + // we only set the dispatch msg info for current checkpoint trans + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) { + ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId); + pTask->chkInfo.dispatchCheckpointTrigger = true; + stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed", + pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + } else { + stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired", + pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + } + taosThreadMutexUnlock(&pTask->lock); } clearBufferedDispatchMsg(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c48a469f41..b2fdd247f1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; int32_t code; - pTask->ver = SSTREAM_TASK_VER; tEncodeSize(tEncodeStreamTask, pTask, len, code); if (code < 0) { return -1; @@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return -1; } + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); tEncodeStreamTask(&encoder, pTask); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 6aa215586a..cfa94209f6 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { return; } - taosThreadMutexLock(&pTask->lock); - pSM->prev.state = pSM->current; pSM->prev.evt = 0; @@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { pSM->startTs = taosGetTimestampMs(); pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); - - taosThreadMutexUnlock(&pTask->lock); } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index c010e31320..da6d71e07b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -19,16 +19,12 @@ extern "C" { #endif #include -#include "os.h" -#include "taoserror.h" #include "theap.h" -#include "tmisce.h" #include "tmsg.h" #include "transLog.h" #include "transportInt.h" #include "trpc.h" #include "ttrace.h" -#include "tutil.h" typedef bool (*FilteFunc)(void* arg); @@ -115,9 +111,12 @@ typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { - void* handle; - int64_t refId; - void* pThrd; + void* handle; + int64_t refId; + void* pThrd; + queue q; + int8_t inited; + SRWLatch latch; } SExHandle; typedef struct { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2d8f4ed3c2..062609baac 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -92,6 +92,7 @@ typedef struct SCliMsg { int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) + queue seqq; } SCliMsg; typedef struct SCliThrd { @@ -121,11 +122,7 @@ typedef struct SCliThrd { SHashObj* batchCache; SCliMsg* stopMsg; - - bool quit; - - int newConnCount; - SHashObj* msgCount; + bool quit; } SCliThrd; typedef struct SCliObj { @@ -262,10 +259,8 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); } \ if (i == sz) { \ pMsg = NULL; \ - tDebug("msg not found, %" PRIu64 "", ahandle); \ } else { \ pMsg = transQueueRm(&conn->cliMsgs, i); \ - tDebug("msg found, %" PRIu64 "", ahandle); \ } \ } while (0) @@ -343,6 +338,34 @@ bool cliMaySendCachedMsg(SCliConn* conn) { _RETURN: return false; } +bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { + if (refId == 0) return false; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh == NULL) { + tDebug("release conn %p, refId: %" PRId64 "", conn, refId); + return false; + } + taosWLockLatch(&exh->latch); + if (exh->handle == NULL) exh->handle = conn; + exh->inited = 1; + if (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + taosWUnLockLatch(&exh->latch); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + transCtxMerge(&conn->ctx, &t->ctx->appCtx); + transQueuePush(&conn->cliMsgs, t); + tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + cliSend(conn); + return true; + } + taosWUnLockLatch(&exh->latch); + tDebug("empty conn %p, refId: %" PRId64 "", conn, refId); + transReleaseExHandle(transGetRefMgt(), refId); + return false; +} + void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; @@ -439,8 +462,14 @@ void cliHandleResp(SCliConn* conn) { return; } } + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + tDebug("conn %p msg refId: %" PRId64 "", conn, refId); destroyCmsg(pMsg); + if (cliConnSendSeqMsg(refId, conn)) { + return; + } + if (cliMaySendCachedMsg(conn) == true) { return; } @@ -451,6 +480,21 @@ void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); } +static void cliDestroyMsgInExhandle(int64_t refId) { + if (refId == 0) return; + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); + if (exh) { + taosWLockLatch(&exh->latch); + while (!QUEUE_IS_EMPTY(&exh->q)) { + queue* h = QUEUE_HEAD(&exh->q); + QUEUE_REMOVE(h); + SCliMsg* t = QUEUE_DATA(h, SCliMsg, seqq); + destroyCmsg(t); + } + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); + } +} void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { if (transQueueEmpty(&pConn->cliMsgs)) { @@ -510,6 +554,8 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { } if (pMsg == NULL || (pMsg && pMsg->type != Release)) { + int64_t refId = (pMsg == NULL ? 0 : (int64_t)(pMsg->msg.info.handle)); + cliDestroyMsgInExhandle(refId); if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; } @@ -678,7 +724,7 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } list->numOfConn++; } - tTrace("%s numOfConn: %d, limit: %d", pTransInst->label, list->numOfConn, pTransInst->connLimitNum); + tDebug("%s numOfConn: %d, limit: %d, dst:%s", pTransInst->label, list->numOfConn, pTransInst->connLimitNum, key); return NULL; } @@ -742,13 +788,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; - if (conn->list->size >= 20) { + if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, 10 * CONN_PERSIST_TIME(pTransInst->idleTime)); } } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -761,8 +807,10 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); - conn->refId = exh->refId; + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); + conn->refId = exh->refId; if (conn->refId == -1) { taosMemoryFree(exh); } @@ -779,9 +827,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { if (exh == NULL) { return -1; } + taosWLockLatch(&exh->latch); exh->handle = conn; exh->pThrd = conn->hostThrd; conn->refId = exh->refId; + taosWUnLockLatch(&exh->latch); transReleaseExHandle(transGetRefMgt(), handle); return 0; @@ -882,7 +932,6 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } conn->list = NULL; - pThrd->newConnCount--; transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -1190,7 +1239,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(pList->port); tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1392,7 +1440,10 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { return; } + taosRLockLatch(&exh->latch); SCliConn* conn = exh->handle; + taosRUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), refId); tDebug("%s conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); @@ -1425,7 +1476,9 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) *ignore = true; return NULL; } else { + taosRLockLatch(&exh->latch); conn = exh->handle; + taosRUnLockLatch(&exh->latch); if (conn == NULL) { conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); @@ -1439,7 +1492,7 @@ SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { - tTrace("%s not found conn in conn pool:%p", ((STrans*)pThrd->pTransInst)->label, pThrd->pool); + tTrace("%s not found conn in conn pool:%p, dst:%s", ((STrans*)pThrd->pTransInst)->label, pThrd->pool, addr); } return conn; } @@ -1598,7 +1651,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_port = (uint16_t)htons(port); tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); - pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd == -1) { tGError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1858,9 +1910,10 @@ void cliIteraConnMsgs(SCliConn* conn) { bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { if (pHead->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { uint64_t ahandle = pHead->ahandle; - tDebug("ahandle = %" PRIu64 "", ahandle); SCliMsg* pMsg = NULL; CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", may ignore", CONN_GET_INST_LABEL(conn), conn, + conn->refId); transClearBuffer(&conn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); @@ -1869,6 +1922,9 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { SCliMsg* cliMsg = transQueueGet(&conn->cliMsgs, i); if (cliMsg->type == Release) { ASSERTS(pMsg == NULL, "trans-cli recv invaid release-req"); + tDebug("%s conn %p receive release request, refId:%" PRId64 ", ignore msg", CONN_GET_INST_LABEL(conn), conn, + conn->refId); + cliDestroyConn(conn, true); return true; } } @@ -1984,11 +2040,9 @@ static SCliThrd* createThrdObj(void* trans) { taosMemoryFree(pThrd); return NULL; } - if (pTransInst->supportBatch) { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 4, pThrd, cliAsyncCb); - } else { - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); - } + int32_t nSync = pTransInst->supportBatch ? 4 : 8; + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); + if (pThrd->asyncPool == NULL) { tError("failed to init async pool"); uv_loop_close(pThrd->loop); @@ -2029,8 +2083,6 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->quit = false; - pThrd->newConnCount = 0; - pThrd->msgCount = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); return pThrd; } static void destroyThrdObj(SCliThrd* pThrd) { @@ -2076,7 +2128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); - taosHashCleanup(pThrd->msgCount); taosMemoryFree(pThrd); } @@ -2095,14 +2146,7 @@ void cliSendQuit(SCliThrd* thrd) { void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { if (uv_handle_get_type(handle) == UV_TIMER) { - // SCliConn* pConn = handle->data; - // if (pConn != NULL && pConn->timer != NULL) { - // SCliThrd* pThrd = pConn->hostThrd; - // uv_timer_stop((uv_timer_t*)handle); - // handle->data = NULL; - // taosArrayPush(pThrd->timerList, &pConn->timer); - // pConn->timer = NULL; - // } + // do nothing } else { uv_read_stop((uv_stream_t*)handle); } @@ -2137,18 +2181,23 @@ static void doCloseIdleConn(void* param) { cliDestroyConn(conn, true); taosMemoryFree(arg); } +static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { + if (!(rpcDebugFlag & DEBUG_DEBUG)) { + return; + } + STransConnCtx* pCtx = pMsg->ctx; + STraceId* trace = &pMsg->msg.info.traceId; + char tbuf[512] = {0}; + EPSET_TO_STR(&pCtx->epSet, tbuf); + tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, + pCtx->retryNextInterval); + return; +} static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; - - if (rpcDebugFlag & DEBUG_DEBUG) { - STraceId* trace = &pMsg->msg.info.traceId; - char tbuf[512] = {0}; - EPSET_TO_STR(&pCtx->epSet, tbuf); - tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", transLabel(pThrd->pTransInst), tbuf, - pCtx->retryStep, pCtx->retryNextInterval); - } + cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); arg->param1 = pMsg; @@ -2157,12 +2206,6 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); } -FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { - if (*val != exp) { - *val = newVal; - } -} - FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; @@ -2504,21 +2547,7 @@ int transReleaseCliHandle(void* handle) { } return 0; } - -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - if (pTransInst == NULL) { - transFreeMsg(pReq->pCont); - return TSDB_CODE_RPC_BROKEN_LINK; - } - - SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); - if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; - } - +static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); epsetAssign(&pCtx->epSet, pEpSet); @@ -2535,12 +2564,48 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; + QUEUE_INIT(&cliMsg->seqq); + return cliMsg; +} + +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); + if (pTransInst == NULL) { + transFreeMsg(pReq->pCont); + return TSDB_CODE_RPC_BROKEN_LINK; + } + + int64_t handle = (int64_t)pReq->info.handle; + SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); + if (pThrd == NULL) { + transFreeMsg(pReq->pCont); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return TSDB_CODE_RPC_BROKEN_LINK; + } + if (handle != 0) { + SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle); + if (exh != NULL) { + taosWLockLatch(&exh->latch); + if (exh->handle == NULL && exh->inited != 0) { + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); + taosWUnLockLatch(&exh->latch); + tDebug("msg refId: %" PRId64 "", handle); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return 0; + } + exh->inited = 1; + taosWUnLockLatch(&exh->latch); + transReleaseExHandle(transGetRefMgt(), handle); + } + } + SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, - EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { - destroyCmsg(cliMsg); + EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); + if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return TSDB_CODE_RPC_BROKEN_LINK; } @@ -2726,6 +2791,8 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); + taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); return exh->refId; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f47a688e6f..21ad5be869 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -761,9 +761,12 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; - pConn->status = ConnRelease; transClearBuffer(&pConn->readBuf); transFreeMsg(transContFromHead((char*)pHead)); + if (pConn->status != ConnAcquire) { + return true; + } + pConn->status = ConnRelease; STransMsg tmsg = {.code = 0, .info.handle = (void*)pConn, .info.traceId = traceId, .info.ahandle = (void*)0x9527}; SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); @@ -1090,6 +1093,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; + QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); return pConn; @@ -1121,6 +1125,7 @@ static int reallocConnRef(SSvrConn* conn) { exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + QUEUE_INIT(&exh->q); transAcquireExHandle(transGetRefMgt(), exh->refId); conn->refId = exh->refId; diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 7c52410ae3..57a71554ec 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -11,7 +11,7 @@ # -*- coding: utf-8 -*- - +import time from util.log import * from util.cases import * from util.sql import * @@ -50,10 +50,11 @@ class TDTestCase: self.tbnum = 20 self.rowNum = 10 self.tag_dict = { - 't0':'int' + 't0':'int', + 't1':f'nchar({self.nchar_length})' } self.tag_values = [ - f'1' + f'1', '""' ] self.binary_str = 'taosdata' self.nchar_str = '涛思数据' @@ -72,7 +73,7 @@ class TDTestCase: tdSql.execute(f'use {self.dbname}') tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) for i in range(self.tbnum): - tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})") + tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]}, {self.tag_values[1]})") self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum) def count_check(self): tdSql.query('select count(*) from information_schema.ins_tables') @@ -316,13 +317,15 @@ class TDTestCase: def ins_encryptions_check(self): key_status_list = ['unknown', 'unset', 'set', 'loaded'] + # unset/none tdSql.execute('drop database if exists db2') tdSql.execute('create database if not exists db2 vgroups 1 replica 1') + time.sleep(2) tdSql.query(f'select * from information_schema.ins_encryptions') result = tdSql.queryResult index = 0 for i in range(0, len(result)): - tdSql.checkEqual(True, result[i][1] in key_status_list) + tdSql.checkEqual(True, result[i][1] in key_status_list[1]) index += 1 tdSql.checkEqual(True, index > 0) @@ -330,11 +333,36 @@ class TDTestCase: result = tdSql.queryResult index = 0 for i in range(0, len(result)): - tdSql.checkEqual(True, result[i][1] in key_status_list) + tdSql.checkEqual(True, result[i][1] in key_status_list[1]) + index += 1 + tdSql.checkEqual(True, index > 0) + + # loaded/sm4 + tdSql.execute('drop database if exists db2') + tdSql.execute('create encrypt_key \'12345678\'') + time.sleep(3) + tdSql.execute('create database if not exists db2 vgroups 1 replica 1 encrypt_algorithm \'sm4\'') + tdSql.query(f'select * from information_schema.ins_encryptions') + result = tdSql.queryResult + index = 0 + for i in range(0, len(result)): + tdSql.checkEqual(True, result[i][1] in key_status_list[3]) + index += 1 + tdSql.checkEqual(True, index > 0) + + tdSql.query(f'show encryptions') + result = tdSql.queryResult + index = 0 + for i in range(0, len(result)): + tdSql.checkEqual(True, result[i][1] in key_status_list[3]) index += 1 tdSql.checkEqual(True, index > 0) - # ENCRYPT_TODO: create encrypt_key 'xxx' + def test_query_ins_tags(self): + sql = f'select tag_name, tag_value from information_schema.ins_tags where table_name = "{self.stbname}_0"' + tdSql.query(sql) + tdSql.checkRows(2) + def run(self): self.prepare_data() @@ -346,6 +374,7 @@ class TDTestCase: self.ins_dnodes_check() self.ins_grants_check() self.ins_encryptions_check() + self.test_query_ins_tags() def stop(self): diff --git a/tests/system-test/7-tmq/tmq_ts4563.py b/tests/system-test/7-tmq/tmq_ts4563.py index fc1cc259ce..13f510ffe6 100644 --- a/tests/system-test/7-tmq/tmq_ts4563.py +++ b/tests/system-test/7-tmq/tmq_ts4563.py @@ -29,9 +29,11 @@ class TDTestCase: tdSql.execute(f'use db_stmt') tdSql.query("select ts,k from st") - tdSql.checkRows(2) + tdSql.checkRows(self.expected_affected_rows) tdSql.execute(f'create topic t_unorder_data as select ts,k from st') + tdSql.execute(f'create topic t_unorder_data_none as select i,k from st') + consumer_dict = { "group.id": "g1", "td.connect.user": "root", @@ -41,7 +43,7 @@ class TDTestCase: consumer = Consumer(consumer_dict) try: - consumer.subscribe(["t_unorder_data"]) + consumer.subscribe(["t_unorder_data", "t_unorder_data_none"]) except TmqError: tdLog.exit(f"subscribe error") @@ -51,18 +53,15 @@ class TDTestCase: res = consumer.poll(1) print(res) if not res: - if cnt == 0: + if cnt == 0 or cnt != 2*self.expected_affected_rows: tdLog.exit("consume error") break val = res.value() if val is None: continue for block in val: + print(block.fetchall(),len(block.fetchall())) cnt += len(block.fetchall()) - - if cnt != 2: - tdLog.exit("consume error") - finally: consumer.close() @@ -110,20 +109,32 @@ class TDTestCase: params = new_multi_binds(2) params[0].timestamp((1626861392589, 1626861392590)) params[1].int([3, None]) - + # print(type(stmt)) tdLog.debug("bind_param_batch start") stmt.bind_param_batch(params) + tdLog.debug("bind_param_batch end") stmt.execute() tdLog.debug("execute end") + conn.execute("flush database %s" % dbname) + + params1 = new_multi_binds(2) + params1[0].timestamp((1626861392587,1626861392586)) + params1[1].int([None,3]) + stmt.bind_param_batch(params1) + stmt.execute() + end = datetime.now() print("elapsed time: ", end - start) - assert stmt.affected_rows == 2 + print(stmt.affected_rows) + self.expected_affected_rows = 4 + if stmt.affected_rows != self.expected_affected_rows : + tdLog.exit("affected_rows error") tdLog.debug("close start") stmt.close() - + # conn.execute("drop database if exists %s" % dbname) conn.close() diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 3ebc255114..ff16bee787 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -78,14 +78,55 @@ class TDTestCase: tdLog.info(cmd) os.system(cmd) + def case1(self): + + tdSql.execute(f'create database if not exists d1 vgroups 1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + + tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " + "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) + + tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT " + "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) + + time.sleep(2) + tdSql.query("select * from sta") + tdSql.checkRows(3) + tdSql.query("select tbname from sta order by tbname") + if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'): + tdLog.exit("error1") + + if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'): + tdLog.exit("error2") + + if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'): + tdLog.exit("error3") + + tdSql.query("select * from stb") + tdSql.checkRows(3) + tdSql.query("select tbname from stb order by tbname") + if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'): + tdLog.exit("error4") + + if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'): + tdLog.exit("error5") + + if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'): + tdLog.exit("error6") + # run def run(self): + self.case1() # gen data random.seed(int(time.time())) self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") # create stream tdSql.execute("use db") - tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) + tdSql.execute("create stream stream3 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) sql = "select count(*) from sta" # loop wait max 60s to check count is ok tdLog.info("loop wait result ...") diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index a1c5405253..188abb4b58 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -18,7 +18,7 @@ IF (TD_WEBSOCKET) COMMAND git clean -f -d BUILD_COMMAND COMMAND cargo update - COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features native-tls + COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features rustls INSTALL_COMMAND COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include @@ -37,7 +37,7 @@ IF (TD_WEBSOCKET) COMMAND git clean -f -d BUILD_COMMAND COMMAND cargo update - COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored + COMMAND cargo build --release -p taos-ws-sys --features rustls INSTALL_COMMAND COMMAND cp target/release/taosws.dll ${CMAKE_BINARY_DIR}/build/lib COMMAND cp target/release/taosws.dll.lib ${CMAKE_BINARY_DIR}/build/lib/taosws.lib @@ -57,7 +57,7 @@ IF (TD_WEBSOCKET) COMMAND git clean -f -d BUILD_COMMAND COMMAND cargo update - COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored + COMMAND cargo build --release -p taos-ws-sys --features rustls INSTALL_COMMAND COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include