other: merge 3.0
This commit is contained in:
commit
94e6f96a75
|
@ -1,7 +1,7 @@
|
|||
---
|
||||
title: Insert
|
||||
sidebar_label: Insert
|
||||
description: This document describes how to insert data into TDengine.
|
||||
description: This document describes the SQL commands and syntax for inserting data into TDengine.
|
||||
---
|
||||
|
||||
## Syntax
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
---
|
||||
title: Data Subscription
|
||||
title: Data Subscription SQL Reference
|
||||
sidebar_label: Data Subscription
|
||||
description: This document describes the SQL statements related to the data subscription component of TDengine.
|
||||
---
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
---
|
||||
title: Stream Processing
|
||||
title: Stream Processing SQL Reference
|
||||
sidebar_label: Stream Processing
|
||||
description: This document describes the SQL statements related to the stream processing component of TDengine.
|
||||
---
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
---
|
||||
title: User-Defined Functions (UDF)
|
||||
title: User-Defined Functions (UDF) SQL Reference
|
||||
sidebar_label: User-Defined Functions
|
||||
description: This document describes the SQL statements related to user-defined functions (UDF) in TDengine.
|
||||
---
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
---
|
||||
title: TDinsight - Grafana-based Zero-Dependency Monitoring Solution for TDengine
|
||||
title: TDinsight
|
||||
sidebar_label: TDinsight
|
||||
description: This document describes TDinsight, a monitoring solution for TDengine.
|
||||
---
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
---
|
||||
title: Quickly Build IT DevOps Visualization System with TDengine + Telegraf + Grafana
|
||||
title: IT Visualization with TDengine + Telegraf + Grafana
|
||||
sidebar_label: TDengine + Telegraf + Grafana
|
||||
description: This document describes how to create an IT visualization system by integrating TDengine with Telegraf and Grafana.
|
||||
---
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
use taos::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let dsn = "taos://localhost:6030";
|
||||
let builder = TaosBuilder::from_dsn(dsn)?;
|
||||
|
||||
let taos = builder.build()?;
|
||||
|
||||
// ANCHOR: create_db_and_table
|
||||
let db = "power";
|
||||
// create database
|
||||
taos.exec_many([
|
||||
format!("DROP DATABASE IF EXISTS `{db}`"),
|
||||
format!("CREATE DATABASE `{db}`"),
|
||||
format!("USE `{db}`"),
|
||||
])
|
||||
.await?;
|
||||
|
||||
// create table
|
||||
taos.exec_many([
|
||||
// create super table
|
||||
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
|
||||
TAGS (`groupid` INT, `location` BINARY(24))",
|
||||
]).await?;
|
||||
// ANCHOR_END: create_db_and_table
|
||||
|
||||
// ANCHOR: insert_data
|
||||
let inserted = taos.exec("INSERT INTO " +
|
||||
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
|
||||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 219, 0.31000) " +
|
||||
"(NOW + 2a, 12.60000, 218, 0.33000) " +
|
||||
"(NOW + 3a, 12.30000, 221, 0.31000) " +
|
||||
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
|
||||
"VALUES " +
|
||||
"(NOW + 1a, 10.30000, 218, 0.25000) ").await?;
|
||||
|
||||
println!("inserted: {} rows", inserted);
|
||||
// ANCHOR_END: insert_data
|
||||
|
||||
// ANCHOR: query_data
|
||||
let mut result = taos.query("SELECT * FROM power.meters").await?;
|
||||
|
||||
for field in result.fields() {
|
||||
println!("got field: {}", field.name());
|
||||
}
|
||||
|
||||
let mut rows = result.rows();
|
||||
let mut nrows = 0;
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
for (col, (name, value)) in row.enumerate() {
|
||||
println!(
|
||||
"[{}] got value in col {} (named `{:>8}`): {}",
|
||||
nrows, col, name, value
|
||||
);
|
||||
}
|
||||
nrows += 1;
|
||||
}
|
||||
// ANCHOR_END: query_data
|
||||
|
||||
// ANCHOR: query_with_req_id
|
||||
let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?;
|
||||
// ANCHOR_END: query_with_req_id
|
||||
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
use taos_query::common::SchemalessPrecision;
|
||||
use taos_query::common::SchemalessProtocol;
|
||||
use taos_query::common::SmlDataBuilder;
|
||||
|
||||
use crate::AsyncQueryable;
|
||||
use crate::AsyncTBuilder;
|
||||
use crate::TaosBuilder;
|
||||
|
||||
async fn put() -> anyhow::Result<()> {
|
||||
std::env::set_var("RUST_LOG", "taos=debug");
|
||||
pretty_env_logger::init();
|
||||
let dsn =
|
||||
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
|
||||
log::debug!("dsn: {:?}", &dsn);
|
||||
|
||||
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
|
||||
|
||||
let db = "power";
|
||||
|
||||
client.exec(format!("drop database if exists {db}")).await?;
|
||||
|
||||
client
|
||||
.exec(format!("create database if not exists {db}"))
|
||||
.await?;
|
||||
|
||||
// should specify database before insert
|
||||
client.exec(format!("use {db}")).await?;
|
||||
|
||||
// SchemalessProtocol::Line
|
||||
let data = [
|
||||
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
|
||||
]
|
||||
.map(String::from)
|
||||
.to_vec();
|
||||
|
||||
let sml_data = SmlDataBuilder::default()
|
||||
.protocol(SchemalessProtocol::Line)
|
||||
.precision(SchemalessPrecision::Millisecond)
|
||||
.data(data.clone())
|
||||
.ttl(1000)
|
||||
.req_id(100u64)
|
||||
.build()?;
|
||||
assert_eq!(client.put(&sml_data).await?, ());
|
||||
|
||||
// SchemalessProtocol::Telnet
|
||||
let data = [
|
||||
"meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
|
||||
]
|
||||
.map(String::from)
|
||||
.to_vec();
|
||||
|
||||
let sml_data = SmlDataBuilder::default()
|
||||
.protocol(SchemalessProtocol::Telnet)
|
||||
.precision(SchemalessPrecision::Millisecond)
|
||||
.data(data.clone())
|
||||
.ttl(1000)
|
||||
.req_id(200u64)
|
||||
.build()?;
|
||||
assert_eq!(client.put(&sml_data).await?, ());
|
||||
|
||||
// SchemalessProtocol::Json
|
||||
let data = [
|
||||
r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
|
||||
]
|
||||
.map(String::from)
|
||||
.to_vec();
|
||||
|
||||
let sml_data = SmlDataBuilder::default()
|
||||
.protocol(SchemalessProtocol::Json)
|
||||
.precision(SchemalessPrecision::Millisecond)
|
||||
.data(data.clone())
|
||||
.ttl(1000)
|
||||
.req_id(300u64)
|
||||
.build()?;
|
||||
assert_eq!(client.put(&sml_data).await?, ());
|
||||
|
||||
client.exec(format!("drop database if exists {db}")).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
use taos::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let taos = TaosBuilder::from_dsn("taos://")?.build().await?;
|
||||
|
||||
taos.exec("DROP DATABASE IF EXISTS power").await?;
|
||||
taos.create_database("power").await?;
|
||||
taos.use_database("power").await?;
|
||||
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
|
||||
|
||||
let mut stmt = Stmt::init(&taos).await?;
|
||||
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
|
||||
|
||||
const NUM_TABLES: usize = 10;
|
||||
const NUM_ROWS: usize = 10;
|
||||
for i in 0..NUM_TABLES {
|
||||
let table_name = format!("d{}", i);
|
||||
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
|
||||
stmt.set_tbname_tags(&table_name, &tags).await?;
|
||||
for j in 0..NUM_ROWS {
|
||||
let values = vec![
|
||||
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
|
||||
ColumnView::from_floats(vec![10.3 + j as f32]),
|
||||
ColumnView::from_ints(vec![219 + j as i32]),
|
||||
ColumnView::from_floats(vec![0.31 + j as f32]),
|
||||
];
|
||||
stmt.bind(&values).await?;
|
||||
}
|
||||
stmt.add_batch().await?;
|
||||
}
|
||||
|
||||
// execute.
|
||||
let rows = stmt.execute().await?;
|
||||
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
|
||||
Ok(())
|
||||
}
|
|
@ -0,0 +1,166 @@
|
|||
use std::time::Duration;
|
||||
use std::str::FromStr;
|
||||
|
||||
use taos::*;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
pretty_env_logger::formatted_timed_builder()
|
||||
.filter_level(log::LevelFilter::Info)
|
||||
.init();
|
||||
use taos_query::prelude::*;
|
||||
let dsn = "taos://localhost:6030".to_string();
|
||||
log::info!("dsn: {}", dsn);
|
||||
let mut dsn = Dsn::from_str(&dsn)?;
|
||||
|
||||
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
|
||||
|
||||
// prepare database and table
|
||||
taos.exec_many([
|
||||
"drop topic if exists topic_meters",
|
||||
"drop database if exists power",
|
||||
"create database if not exists power WAL_RETENTION_PERIOD 86400",
|
||||
"use power",
|
||||
|
||||
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
|
||||
|
||||
"create table if not exists power.d001 using power.meters tags(1,'location')",
|
||||
|
||||
])
|
||||
.await?;
|
||||
|
||||
taos.exec_many([
|
||||
"drop database if exists db2",
|
||||
"create database if not exists db2 wal_retention_period 3600",
|
||||
"use db2",
|
||||
])
|
||||
.await?;
|
||||
|
||||
// ANCHOR: create_topic
|
||||
taos.exec_many([
|
||||
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
|
||||
])
|
||||
.await?;
|
||||
// ANCHOR_END: create_topic
|
||||
|
||||
// ANCHOR: create_consumer
|
||||
dsn.params.insert("group.id".to_string(), "abc".to_string());
|
||||
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());
|
||||
|
||||
let builder = TmqBuilder::from_dsn(&dsn)?;
|
||||
let mut consumer = builder.build().await?;
|
||||
// ANCHOR_END: create_consumer
|
||||
|
||||
// ANCHOR: subscribe
|
||||
consumer.subscribe(["topic_meters"]).await?;
|
||||
// ANCHOR_END: subscribe
|
||||
|
||||
// ANCHOR: consume
|
||||
{
|
||||
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
|
||||
|
||||
while let Some((offset, message)) = stream.try_next().await? {
|
||||
|
||||
let topic: &str = offset.topic();
|
||||
let database = offset.database();
|
||||
let vgroup_id = offset.vgroup_id();
|
||||
log::debug!(
|
||||
"topic: {}, database: {}, vgroup_id: {}",
|
||||
topic,
|
||||
database,
|
||||
vgroup_id
|
||||
);
|
||||
|
||||
match message {
|
||||
MessageSet::Meta(meta) => {
|
||||
log::info!("Meta");
|
||||
let raw = meta.as_raw_meta().await?;
|
||||
taos.write_raw_meta(&raw).await?;
|
||||
|
||||
let json = meta.as_json_meta().await?;
|
||||
let sql = json.to_string();
|
||||
if let Err(err) = taos.exec(sql).await {
|
||||
println!("maybe error: {}", err);
|
||||
}
|
||||
}
|
||||
MessageSet::Data(data) => {
|
||||
log::info!("Data");
|
||||
while let Some(data) = data.fetch_raw_block().await? {
|
||||
log::debug!("data: {:?}", data);
|
||||
}
|
||||
}
|
||||
MessageSet::MetaData(meta, data) => {
|
||||
log::info!("MetaData");
|
||||
let raw = meta.as_raw_meta().await?;
|
||||
taos.write_raw_meta(&raw).await?;
|
||||
|
||||
let json = meta.as_json_meta().await?;
|
||||
let sql = json.to_string();
|
||||
if let Err(err) = taos.exec(sql).await {
|
||||
println!("maybe error: {}", err);
|
||||
}
|
||||
|
||||
while let Some(data) = data.fetch_raw_block().await? {
|
||||
log::debug!("data: {:?}", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
consumer.commit(offset).await?;
|
||||
}
|
||||
}
|
||||
// ANCHOR_END: consume
|
||||
|
||||
// ANCHOR: assignments
|
||||
let assignments = consumer.assignments().await.unwrap();
|
||||
log::info!("assignments: {:?}", assignments);
|
||||
// ANCHOR_END: assignments
|
||||
|
||||
// seek offset
|
||||
for topic_vec_assignment in assignments {
|
||||
let topic = &topic_vec_assignment.0;
|
||||
let vec_assignment = topic_vec_assignment.1;
|
||||
for assignment in vec_assignment {
|
||||
let vgroup_id = assignment.vgroup_id();
|
||||
let current = assignment.current_offset();
|
||||
let begin = assignment.begin();
|
||||
let end = assignment.end();
|
||||
log::debug!(
|
||||
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
|
||||
topic,
|
||||
vgroup_id,
|
||||
current,
|
||||
begin,
|
||||
end
|
||||
);
|
||||
// ANCHOR: seek_offset
|
||||
let res = consumer.offset_seek(topic, vgroup_id, end).await;
|
||||
if res.is_err() {
|
||||
log::error!("seek offset error: {:?}", res);
|
||||
let a = consumer.assignments().await.unwrap();
|
||||
log::error!("assignments: {:?}", a);
|
||||
}
|
||||
// ANCHOR_END: seek_offset
|
||||
}
|
||||
|
||||
let topic_assignment = consumer.topic_assignment(topic).await;
|
||||
log::debug!("topic assignment: {:?}", topic_assignment);
|
||||
}
|
||||
|
||||
// after seek offset
|
||||
let assignments = consumer.assignments().await.unwrap();
|
||||
log::info!("after seek offset assignments: {:?}", assignments);
|
||||
|
||||
// ANCHOR: unsubscribe
|
||||
consumer.unsubscribe().await;
|
||||
// ANCHOR_END: unsubscribe
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
taos.exec_many([
|
||||
"drop database db2",
|
||||
"drop topic topic_meters",
|
||||
"drop database power",
|
||||
])
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
|
@ -268,7 +268,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
|
|||
|
||||
bool alreadyAddGroupId(char* ctbName);
|
||||
bool isAutoTableName(char* ctbName);
|
||||
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId);
|
||||
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId);
|
||||
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
|
||||
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask;
|
|||
typedef struct SStreamQueue SStreamQueue;
|
||||
typedef struct SStreamTaskSM SStreamTaskSM;
|
||||
|
||||
#define SSTREAM_TASK_VER 3
|
||||
#define SSTREAM_TASK_VER 4
|
||||
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
|
||||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
||||
|
|
|
@ -2188,10 +2188,14 @@ _end:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId) {
|
||||
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){
|
||||
char tmp[TSDB_TABLE_NAME_LEN] = {0};
|
||||
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%" PRIu64, groupId);
|
||||
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
|
||||
if (stbName == NULL){
|
||||
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
|
||||
}else{
|
||||
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId);
|
||||
}
|
||||
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
|
||||
strcat(ctbName, tmp);
|
||||
}
|
||||
|
||||
|
@ -2201,6 +2205,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0
|
|||
|
||||
bool alreadyAddGroupId(char* ctbName) {
|
||||
size_t len = strlen(ctbName);
|
||||
if (len == 0) return false;
|
||||
size_t _location = len - 1;
|
||||
while (_location > 0) {
|
||||
if (ctbName[_location] < '0' || ctbName[_location] > '9') {
|
||||
|
|
|
@ -51,8 +51,8 @@
|
|||
|
||||
#define ENCODESQL() \
|
||||
do { \
|
||||
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
|
||||
if (pReq->sqlLen > 0 && pReq->sql != NULL) { \
|
||||
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
|
||||
if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -3025,7 +3025,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
|||
|
||||
ENCODESQL();
|
||||
|
||||
if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
|
@ -3140,7 +3140,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pReq->keepTimeOffset) < 0) return -1;
|
||||
ENCODESQL();
|
||||
if (tEncodeI32(&encoder, pReq->withArbitrator) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->withArbitrator) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
|
|
@ -35,6 +35,7 @@ int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup);
|
|||
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||
|
||||
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup);
|
||||
|
||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
|
|
|
@ -260,6 +260,14 @@ int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
|
@ -535,10 +543,10 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
|||
int32_t vgId = arbGroupDup.vgId;
|
||||
int64_t nowMs = taosGetTimestampMs();
|
||||
|
||||
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
|
||||
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
|
||||
SArbAssignedLeader* pAssignedLeader = &arbGroupDup.assignedLeader;
|
||||
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
|
||||
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
|
||||
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
|
||||
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
|
||||
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
|
||||
|
||||
// 1. has assigned && is sync => send req
|
||||
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) {
|
||||
|
@ -667,9 +675,16 @@ static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) {
|
|||
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
|
||||
newGroup.version = req.version;
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
|
||||
if (!pOldGroup) {
|
||||
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
|
||||
return 0;
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pOldGroup);
|
||||
|
||||
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
|
||||
mError("vgId:%d, arb failed to update arbgroup, since %s", req.vgId, terrstr());
|
||||
mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr());
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -292,16 +292,16 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons
|
|||
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
|
||||
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
|
||||
if (tlen <= 0){
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
void *buf = rpcMallocCont(tlen);
|
||||
if (buf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){
|
||||
if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
|
||||
rpcFreeCont(buf);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
}
|
||||
pMsg->info.rsp = buf;
|
||||
pMsg->info.rspLen = tlen;
|
||||
|
@ -316,7 +316,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
|||
SMqConsumerObj *pConsumer = NULL;
|
||||
|
||||
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
|
|
@ -1209,6 +1209,25 @@ static int32_t mndSetDropDbPrepareLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
|||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
SArbGroup *pArbGroup = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pArbGroup->dbUid == pDb->uid) {
|
||||
if (mndSetDropArbGroupPrepareLogs(pTrans,pArbGroup) != 0) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pArbGroup);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pArbGroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
|
|||
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
|
||||
for (int32_t j = 0; j < innerSz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pArray, j);
|
||||
pTask->ver = SSTREAM_TASK_VER;
|
||||
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
pTask->ver = SSTREAM_TASK_VER;
|
||||
}
|
||||
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1104,14 +1104,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
} else if (createReq.tagVer > 0 || createReq.colVer > 0) {
|
||||
int32_t tagDelta = createReq.tagVer - pStb->tagVer;
|
||||
int32_t colDelta = createReq.colVer - pStb->colVer;
|
||||
int32_t verDelta = tagDelta + colDelta;
|
||||
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
|
||||
createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
|
||||
if (tagDelta <= 0 && colDelta <= 0) {
|
||||
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) {
|
||||
} else if ((tagDelta == 1 && colDelta == 0) ||
|
||||
(tagDelta == 0 && colDelta == 1) ||
|
||||
(pStb->colVer == 1 && createReq.colVer > 1) ||
|
||||
(pStb->tagVer == 1 && createReq.tagVer > 1)) {
|
||||
isAlter = true;
|
||||
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
|
||||
} else {
|
||||
|
|
|
@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, NULL, 0);
|
||||
|
||||
pTask->ver = SSTREAM_TASK_VER;
|
||||
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
pTask->ver = SSTREAM_TASK_VER;
|
||||
}
|
||||
tEncodeStreamTask(&encoder, pTask);
|
||||
|
||||
int32_t size = encoder.pos;
|
||||
|
@ -2153,41 +2155,60 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
|
||||
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||
if (pStream == NULL) {
|
||||
mError("failed to find the stream:0x%" PRIx64 " not handle the checkpoint req", req.streamId);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
mWarn("failed to find the stream:0x%" PRIx64 ", not handle the checkpoint req, try to acquire in buf", req.streamId);
|
||||
|
||||
return -1;
|
||||
// not in meta-store yet, try to acquire the task in exec buffer
|
||||
// the checkpoint req arrives too soon before the completion of the create stream trans.
|
||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||
void* p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||
if (p == NULL) {
|
||||
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint req", req.streamId);
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
return -1;
|
||||
} else {
|
||||
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
|
||||
req.streamId, req.taskId);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||
int32_t numOfTasks = (pStream == NULL)? 0: mndGetNumOfStreamTasks(pStream);
|
||||
|
||||
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||
if (pReqTaskList == NULL) {
|
||||
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||
doAddTaskId(pList, req.taskId, pStream->uid, numOfTasks);
|
||||
doAddTaskId(pList, req.taskId, req.streamId, numOfTasks);
|
||||
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
||||
|
||||
pReqTaskList = (SArray **)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||
} else {
|
||||
doAddTaskId(*pReqTaskList, req.taskId, pStream->uid, numOfTasks);
|
||||
doAddTaskId(*pReqTaskList, req.taskId, req.streamId, numOfTasks);
|
||||
}
|
||||
|
||||
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||
if (total == numOfTasks) { // all tasks has send the reqs
|
||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
||||
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
||||
|
||||
// TODO:handle error
|
||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||
if (pStream != NULL) { // TODO:handle error
|
||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||
} else {
|
||||
// todo: wait for the create stream trans completed, and launch the checkpoint trans
|
||||
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||
// sleep(500ms)
|
||||
}
|
||||
|
||||
// remove this entry
|
||||
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
|
||||
|
||||
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
|
||||
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
|
||||
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
|
||||
}
|
||||
|
||||
if (pStream != NULL) {
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
{
|
||||
|
|
|
@ -79,6 +79,7 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
|||
int32_t nKey = 0;
|
||||
int32_t nData = 0;
|
||||
STbDbKey key;
|
||||
SMetaInfo info;
|
||||
|
||||
*ppData = NULL;
|
||||
for (;;) {
|
||||
|
@ -91,7 +92,8 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
if (key.version < pReader->sver) {
|
||||
if (key.version < pReader->sver //
|
||||
|| metaGetInfo(pReader->pMeta, key.uid, &info, NULL) == TSDB_CODE_NOT_FOUND) {
|
||||
tdbTbcMoveToNext(pReader->pTbc);
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -101,10 +101,7 @@ int32_t tqInitialize(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
|
|||
if (varTbName != NULL && varTbName != (void*)-1) {
|
||||
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
|
||||
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
|
||||
buildCtbNameAddGroupId(name, groupId);
|
||||
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) {
|
||||
buildCtbNameAddGroupId(stbFullName, name, groupId);
|
||||
}
|
||||
} else if (stbFullName) {
|
||||
name = buildCtbNameByGroupId(stbFullName, groupId);
|
||||
|
@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
|
|||
int64_t gid, bool newSubTableRule) {
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
|
||||
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
|
||||
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) {
|
||||
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
|
||||
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
|
||||
buildCtbNameAddGroupId(pCreateTableReq->name, gid);
|
||||
buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid);
|
||||
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
|
||||
} else {
|
||||
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
|
||||
|
@ -672,10 +672,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
|
|||
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
|
||||
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
|
||||
} else {
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
|
||||
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
|
||||
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
|
||||
!alreadyAddGroupId(dstTableName) && groupId != 0) {
|
||||
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
|
||||
buildCtbNameAddGroupId(dstTableName, groupId);
|
||||
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
|
||||
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
|
||||
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -695,8 +695,8 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
|
|||
return &pIter->cv;
|
||||
}
|
||||
|
||||
if (pIter->iColData < pIter->pRow->pBlockData->nColData) {
|
||||
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData], pIter->pRow->iRow, &pIter->cv);
|
||||
if (pIter->iColData <= pIter->pRow->pBlockData->nColData) {
|
||||
tColDataGetValue(&pIter->pRow->pBlockData->aColData[pIter->iColData - 1], pIter->pRow->iRow, &pIter->cv);
|
||||
++pIter->iColData;
|
||||
return &pIter->cv;
|
||||
} else {
|
||||
|
|
|
@ -879,7 +879,7 @@ static int32_t sysTableGetGeomText(char* iGeom, int32_t nGeom, char** output, in
|
|||
char* outputWKT = NULL;
|
||||
|
||||
if (nGeom == 0) {
|
||||
if (!(*output = strdup(""))) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (!(*output = taosStrdup(""))) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
*nOutput = 0;
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -496,7 +496,6 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
|
|||
|
||||
ASSERT(pBlock != NULL);
|
||||
if (pBlock->pDataBlock == NULL) {
|
||||
// tscError("pBlock->pDataBlock == NULL");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -514,10 +513,6 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
|
|||
}
|
||||
|
||||
// point interpolation does not require the end key time window interpolation.
|
||||
// if (pointInterpQuery) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
// interpolation query does not generate the time window end interpolation
|
||||
done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
|
||||
if (!done) {
|
||||
|
|
|
@ -748,16 +748,52 @@ insert_query(A) ::= INSERT INTO full_table_name(C) query_or_subquery(B).
|
|||
|
||||
/************************************************ tags_literal *************************************************************/
|
||||
tags_literal(A) ::= NK_INTEGER(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); }
|
||||
tags_literal(A) ::= NK_INTEGER(B) NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_INTEGER(B) NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_INTEGER(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_INTEGER NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_INTEGER NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_INTEGER(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_INTEGER NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_INTEGER NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_FLOAT(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_DOUBLE, &B, NULL); }
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_FLOAT(C). {
|
||||
SToken t = B;
|
||||
|
@ -771,29 +807,113 @@ tags_literal(A) ::= NK_MINUS(B) NK_FLOAT(C).
|
|||
}
|
||||
|
||||
tags_literal(A) ::= NK_BIN(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); }
|
||||
tags_literal(A) ::= NK_BIN(B) NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_BIN(B) NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_BIN(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_BIN NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_BIN NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_BIN(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_BIN NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_BIN NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_HEX(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &B, NULL); }
|
||||
tags_literal(A) ::= NK_HEX(B) NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_HEX(B) NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_HEX(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_HEX NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_PLUS(B) NK_HEX NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_HEX(C). {
|
||||
SToken t = B;
|
||||
t.n = (C.z + C.n) - B.z;
|
||||
A = createRawValueNode(pCxt, TSDB_DATA_TYPE_UBIGINT, &t, NULL);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_HEX NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_MINUS(B) NK_HEX NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
|
||||
tags_literal(A) ::= NK_STRING(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B, NULL); }
|
||||
tags_literal(A) ::= NK_STRING(B) NK_PLUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_STRING(B) NK_MINUS duration_literal(C). {
|
||||
SToken l = B;
|
||||
SToken r = getTokenFromRawExprNode(pCxt, C);
|
||||
l.n = (r.z + r.n) - l.z;
|
||||
A = createRawValueNodeExt(pCxt, TSDB_DATA_TYPE_BINARY, &l, NULL, C);
|
||||
}
|
||||
tags_literal(A) ::= NK_BOOL(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_BOOL, &B, NULL); }
|
||||
tags_literal(A) ::= NULL(B). { A = createRawValueNode(pCxt, TSDB_DATA_TYPE_NULL, &B, NULL); }
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -278,6 +278,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
|||
pTask->chkInfo.numOfNotReady = 0;
|
||||
pTask->chkInfo.transId = 0;
|
||||
pTask->chkInfo.dispatchCheckpointTrigger = false;
|
||||
pTask->chkInfo.downstreamAlignNum = 0;
|
||||
|
||||
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
|
||||
if (clearChkpReadyMsg) {
|
||||
|
|
|
@ -580,12 +580,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
} else {
|
||||
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
|
||||
pTask->subtableWithoutMd5 != 1 &&
|
||||
if(pTask->subtableWithoutMd5 != 1 &&
|
||||
!isAutoTableName(pDataBlock->info.parTbName) &&
|
||||
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
|
||||
groupId != 0){
|
||||
buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId);
|
||||
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
|
||||
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
|
||||
buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);
|
||||
|
|
|
@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
void* buf = NULL;
|
||||
int32_t len;
|
||||
int32_t code;
|
||||
pTask->ver = SSTREAM_TASK_VER;
|
||||
tEncodeSize(tEncodeStreamTask, pTask, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
|
@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
pTask->ver = SSTREAM_TASK_VER;
|
||||
}
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, len);
|
||||
tEncodeStreamTask(&encoder, pTask);
|
||||
|
@ -648,14 +650,17 @@ SStreamTask* streamMetaAcquireOneTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||
|
||||
// not safe to use the pTask->id.idStr, since pTask may be released by other threads when print logs.
|
||||
if (ref > 0) {
|
||||
stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref);
|
||||
stTrace("s-task:0x%x release task, ref:%d", taskId, ref);
|
||||
} else if (ref == 0) {
|
||||
stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr);
|
||||
stTrace("s-task:0x%x all refs are gone, free it", taskId);
|
||||
tFreeStreamTask(pTask);
|
||||
} else if (ref < 0) {
|
||||
stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr);
|
||||
stError("task ref is invalid, ref:%d, 0x%x", ref, taskId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -824,13 +829,6 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
|||
return chkpId;
|
||||
}
|
||||
|
||||
static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
taosArrayDestroy(pRecycleList);
|
||||
}
|
||||
|
||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
||||
TBC* pCur = NULL;
|
||||
void* pKey = NULL;
|
||||
|
@ -847,10 +845,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
stInfo("vgId:%d load stream tasks from meta files", vgId);
|
||||
|
||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||
stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno));
|
||||
int32_t code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno));
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return -1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
|
@ -859,20 +858,18 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
stError("vgId:%d failed to load stream task from meta-files, code:%s", vgId, tstrerror(terrno));
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
if (tDecodeStreamTask(&decoder, pTask) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
tFreeStreamTask(pTask);
|
||||
stError(
|
||||
"vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild "
|
||||
"stream manually",
|
||||
vgId, tsDataDir);
|
||||
return -1;
|
||||
break;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
|
@ -892,10 +889,11 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (p == NULL) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) {
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
code = pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
|
||||
if (code < 0) {
|
||||
stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
continue;
|
||||
}
|
||||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
|
@ -907,9 +905,10 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) {
|
||||
doClear(pKey, pVal, pCur, pRecycleList);
|
||||
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
|
||||
taosArrayPop(pMeta->pTaskList);
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
|
@ -925,10 +924,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
|
||||
if (tdbTbcClose(pCur) < 0) {
|
||||
stError("vgId:%d failed to close meta-file cursor", vgId);
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return -1;
|
||||
stError("vgId:%d failed to close meta-file cursor, code:%s, continue", vgId, tstrerror(terrno));
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pRecycleList) > 0) {
|
||||
|
@ -942,8 +940,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
|
||||
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
|
||||
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);
|
||||
|
||||
taosArrayDestroy(pRecycleList);
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
|
||||
|
|
|
@ -216,7 +216,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
|
||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1;
|
||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
|
||||
|
||||
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
|
||||
|
@ -287,7 +287,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||
}
|
||||
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
|
||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
|
||||
}
|
||||
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
|
||||
|
||||
tEndDecode(pDecoder);
|
||||
|
|
|
@ -866,9 +866,14 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
|
|||
SyncTerm term = -1;
|
||||
SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
|
||||
SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex);
|
||||
errno = 0;
|
||||
|
||||
if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) {
|
||||
term = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1);
|
||||
if (term < 0 && (errno == ENFILE || errno == EMFILE)) {
|
||||
sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, terrstr(), index + 1);
|
||||
return -1;
|
||||
}
|
||||
if ((index + 1 < firstVer) || (term < 0) ||
|
||||
(term != pMsg->lastMatchTerm && (index + 1 == firstVer || index == firstVer))) {
|
||||
ASSERT(term >= 0 || terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||
|
|
|
@ -585,11 +585,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
|
|||
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
||||
void* pool = pThrd->pool;
|
||||
STrans* pTranInst = pThrd->pTransInst;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||
size_t klen = strlen(key);
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key) + 1);
|
||||
taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, klen);
|
||||
|
||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||
QUEUE_INIT(&nList->msgQ);
|
||||
|
@ -624,11 +625,12 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
|
|||
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
|
||||
void* pool = pThrd->pool;
|
||||
STrans* pTransInst = pThrd->pTransInst;
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
|
||||
size_t klen = strlen(key);
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, klen);
|
||||
if (plist == NULL) {
|
||||
SConnList list = {0};
|
||||
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, strlen(key) + 1);
|
||||
taosHashPut((SHashObj*)pool, key, klen, (void*)&list, sizeof(list));
|
||||
plist = taosHashGet(pool, key, klen);
|
||||
|
||||
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
|
||||
QUEUE_INIT(&nList->msgQ);
|
||||
|
@ -714,7 +716,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
cliDestroyConnMsgs(conn, false);
|
||||
|
||||
if (conn->list == NULL) {
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1);
|
||||
conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr));
|
||||
}
|
||||
|
||||
SConnList* pList = conn->list;
|
||||
|
@ -1279,7 +1281,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
|
||||
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
|
||||
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1);
|
||||
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr));
|
||||
int64_t cTimestamp = taosGetTimestampMs();
|
||||
if (item != NULL) {
|
||||
int32_t elapse = cTimestamp - item->timestamp;
|
||||
|
@ -1291,7 +1293,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
|||
}
|
||||
} else {
|
||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem));
|
||||
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -1471,7 +1473,8 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
|
|||
}
|
||||
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
|
||||
uint32_t addr = 0;
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
|
||||
size_t len = strlen(fqdn);
|
||||
uint32_t* v = taosHashGet(cache, fqdn, len);
|
||||
if (v == NULL) {
|
||||
addr = taosGetIpv4FromFqdn(fqdn);
|
||||
if (addr == 0xffffffff) {
|
||||
|
@ -1480,7 +1483,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
|
|||
return addr;
|
||||
}
|
||||
|
||||
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
|
||||
taosHashPut(cache, fqdn, len, &addr, sizeof(addr));
|
||||
} else {
|
||||
addr = *v;
|
||||
}
|
||||
|
@ -1490,13 +1493,14 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
|
|||
// impl later
|
||||
uint32_t addr = taosGetIpv4FromFqdn(fqdn);
|
||||
if (addr != 0xffffffff) {
|
||||
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
|
||||
size_t len = strlen(fqdn);
|
||||
uint32_t* v = taosHashGet(cache, fqdn, len);
|
||||
if (addr != *v) {
|
||||
char old[64] = {0}, new[64] = {0};
|
||||
tinet_ntoa(old, *v);
|
||||
tinet_ntoa(new, addr);
|
||||
tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new);
|
||||
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
|
||||
taosHashPut(cache, fqdn, len, &addr, sizeof(addr));
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
@ -1537,21 +1541,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
if (tmsgIsValid(pMsg->msg.msgType)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pMsg->msg.msgType));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count + 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
|
||||
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
|
||||
char addr[TSDB_FQDN_LEN + 64] = {0};
|
||||
|
@ -1704,9 +1693,8 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
|||
uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
|
||||
char key[TSDB_FQDN_LEN + 64] = {0};
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
// SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key));
|
||||
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key));
|
||||
size_t klen = strlen(key);
|
||||
SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen);
|
||||
if (ppBatchList == NULL || *ppBatchList == NULL) {
|
||||
SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList));
|
||||
QUEUE_INIT(&pBatchList->wq);
|
||||
|
@ -1730,7 +1718,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) {
|
|||
|
||||
QUEUE_PUSH(&pBatchList->wq, &pBatch->listq);
|
||||
|
||||
taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*));
|
||||
taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*));
|
||||
} else {
|
||||
if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) {
|
||||
SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch));
|
||||
|
@ -1800,21 +1788,6 @@ static void cliAsyncCb(uv_async_t* handle) {
|
|||
QUEUE_MOVE(&item->qmsg, &wq);
|
||||
taosThreadMutexUnlock(&item->mtx);
|
||||
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
void* pIter = taosHashIterate(pThrd->msgCount, NULL);
|
||||
while (pIter != NULL) {
|
||||
int* count = pIter;
|
||||
size_t len = 0;
|
||||
char* key = taosHashGetKey(pIter, &len);
|
||||
if (*count != 0) {
|
||||
tDebug("key: %s count: %d", key, *count);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pThrd->msgCount, pIter);
|
||||
}
|
||||
tDebug("all conn count: %d", pThrd->newConnCount);
|
||||
}
|
||||
|
||||
int8_t supportBatch = pTransInst->supportBatch;
|
||||
if (supportBatch == 0) {
|
||||
cliNoBatchDealReq(&wq, pThrd);
|
||||
|
@ -1971,8 +1944,9 @@ static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) {
|
|||
if (pMsg == NULL) {
|
||||
return;
|
||||
}
|
||||
if (param != NULL) {
|
||||
SCliThrd* pThrd = param;
|
||||
|
||||
SCliThrd* pThrd = param;
|
||||
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) {
|
||||
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
|
||||
}
|
||||
destroyCmsg(pMsg);
|
||||
|
@ -1984,12 +1958,9 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
|
|||
SCliMsg* pMsg = arg->param1;
|
||||
SCliThrd* pThrd = arg->param2;
|
||||
|
||||
tDebug("destroy Ahandle A");
|
||||
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
|
||||
tDebug("destroy Ahandle B");
|
||||
if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
|
||||
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
|
||||
}
|
||||
tDebug("destroy Ahandle C");
|
||||
|
||||
transDestroyConnCtx(pMsg->ctx);
|
||||
transFreeMsg(pMsg->msg.pCont);
|
||||
|
@ -2411,20 +2382,6 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
|
|||
tGTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
}
|
||||
}
|
||||
if (rpcDebugFlag & DEBUG_TRACE) {
|
||||
if (tmsgIsValid(pResp->msgType - 1)) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "%s", TMSG_INFO(pResp->msgType - 1));
|
||||
int* count = taosHashGet(pThrd->msgCount, buf, sizeof(buf));
|
||||
if (NULL == 0) {
|
||||
int localCount = 0;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
} else {
|
||||
int localCount = *count - 1;
|
||||
taosHashPut(pThrd->msgCount, buf, sizeof(buf), &localCount, sizeof(localCount));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pCtx->pSem || pCtx->syncMsgRef != 0) {
|
||||
tGTrace("%s conn %p(sync) handle resp", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
if (pCtx->pSem) {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#include "tgeosctx.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define QUEUE_THRESHOLD 1000 * 1000
|
||||
#define QUEUE_THRESHOLD (1000 * 1000)
|
||||
|
||||
typedef void *(*ThreadFp)(void *param);
|
||||
|
||||
|
|
|
@ -936,6 +936,9 @@ sql_error alter table st_bool_i1 set tag tagname="123abc"
|
|||
sql alter table st_bool_i2 set tag tagname="123"
|
||||
sql_error alter table st_bool_i3 set tag tagname=abc
|
||||
sql_error alter table st_bool_i4 set tag tagname="abc"
|
||||
sql_error alter table st_bool_i4 set tag tagname=now
|
||||
sql_error alter table st_bool_i4 set tag tagname=now()+1d
|
||||
sql_error alter table st_bool_i4 set tag tagname=1+1d
|
||||
sql_error alter table st_bool_i5 set tag tagname=" "
|
||||
sql_error alter table st_bool_i6 set tag tagname=''
|
||||
|
||||
|
|
|
@ -913,6 +913,8 @@ sql_error alter table st_int_e19 set tag tagname=123abc
|
|||
sql_error alter table st_int_e20 set tag tagname="123abc"
|
||||
sql_error alter table st_int_e22 set tag tagname=abc
|
||||
sql_error alter table st_int_e23 set tag tagname="abc"
|
||||
sql_error alter table st_int_e25 set tag tagname=1+1d
|
||||
sql_error alter table st_int_e25 set tag tagname="1"+1d
|
||||
sql_error alter table st_int_e24 set tag tagname=" "
|
||||
sql_error alter table st_int_e25 set tag tagname=''
|
||||
sql alter table st_int_e26_1 set tag tagname='123'
|
||||
|
|
|
@ -132,6 +132,77 @@ sql show tags from st_timestamp_22
|
|||
if $data05 != -1 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_23 using mt_timestamp tags (1+ 1d )
|
||||
sql show tags from st_timestamp_23
|
||||
if $data05 != 86400001 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_24 using mt_timestamp tags (-0 + 1d)
|
||||
sql show tags from st_timestamp_24
|
||||
if $data05 != 86400000 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_25 using mt_timestamp tags ("-0" -1s)
|
||||
sql show tags from st_timestamp_25
|
||||
if $data05 != -1000 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_26 using mt_timestamp tags (0b01 -1a)
|
||||
sql show tags from st_timestamp_26
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_27 using mt_timestamp tags (0b01 -1s)
|
||||
sql show tags from st_timestamp_27
|
||||
if $data05 != -999 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_28 using mt_timestamp tags ("0x01" +1u)
|
||||
sql show tags from st_timestamp_28
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_29 using mt_timestamp tags (0x01 +1b)
|
||||
sql show tags from st_timestamp_29
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_30 using mt_timestamp tags (-0b00 -0a)
|
||||
sql show tags from st_timestamp_30
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_31 using mt_timestamp tags ("-0x00" +1u)
|
||||
sql show tags from st_timestamp_31
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_32 using mt_timestamp tags (-0x00 +1b)
|
||||
sql show tags from st_timestamp_32
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_33 using mt_timestamp tags (now +1b)
|
||||
sql show tags from st_timestamp_33
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_34 using mt_timestamp tags ("now()" +1b)
|
||||
sql show tags from st_timestamp_34
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_35 using mt_timestamp tags (today() +1d)
|
||||
sql show tags from st_timestamp_35
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql create table st_timestamp_36 using mt_timestamp tags ("today()" +1d)
|
||||
sql show tags from st_timestamp_36
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
## case 01: insert values for test column values
|
||||
sql insert into st_timestamp_0 values(now,NULL)
|
||||
|
@ -249,6 +320,76 @@ sql select ts, cast(c as bigint) from st_timestamp_22
|
|||
if $data01 != -1 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_23 values(now,1+ 1d )
|
||||
sql select ts, cast(c as bigint) from st_timestamp_23
|
||||
if $data01 != 86400001 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_24 values(now,-0 + 1d)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_24
|
||||
if $data01 != 86400000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_25 values(now,"-0" -1s)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_25
|
||||
if $data01 != -1000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_26 values(now,0b01 -1a)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_26
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_27 values(now,+0b01 -1s)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_27
|
||||
if $data01 != -999 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_28 values(now,"+0x01" +1u)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_28
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_29 values(now,0x01 +1b)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_29
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_30 values(now,-0b00 -0a)
|
||||
sql show tags from st_timestamp_30
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_31 values(now,"-0x00" +1u)
|
||||
sql show tags from st_timestamp_31
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_32 values (now,-0x00 +1b)
|
||||
sql show tags from st_timestamp_32
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_33 values(now,now +1b)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_33
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_34 values(now,"now()" +1b)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_34
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_35 values(now,today() +1d)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_35
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_36 values(now,"today()" +1d)
|
||||
sql select ts, cast(c as bigint) from st_timestamp_36
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
## case 02: dynamic create table for test tag values
|
||||
sql insert into st_timestamp_100 using mt_timestamp tags(NULL) values(now, NULL)
|
||||
|
@ -450,6 +591,136 @@ sql select ts, cast(c as bigint) from st_timestamp_1022
|
|||
if $data01 != -1 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1023 using mt_timestamp tags(+1+1d) values(now,+1+ 1d )
|
||||
sql show tags from st_timestamp_1023
|
||||
if $data05 != 86400001 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1023
|
||||
if $data01 != 86400001 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1024 using mt_timestamp tags(-0+1d) values(now,-0 + 1d)
|
||||
sql show tags from st_timestamp_1024
|
||||
if $data05 != 86400000 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1024
|
||||
if $data01 != 86400000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1025 using mt_timestamp tags("-0" -1s) values(now,"-0" -1s)
|
||||
sql show tags from st_timestamp_1025
|
||||
if $data05 != -1000 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1025
|
||||
if $data01 != -1000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1026 using mt_timestamp tags(+0b01-1a) values(now,+0b01 -1a)
|
||||
sql show tags from st_timestamp_1026
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1026
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1027 using mt_timestamp tags(0b01-1s) values(now,0b01 -1s)
|
||||
sql show tags from st_timestamp_1027
|
||||
if $data05 != -999 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1027
|
||||
if $data01 != -999 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1028 using mt_timestamp tags("0x01" + 1u) values(now,"0x01" +1u)
|
||||
sql show tags from st_timestamp_1028
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1028
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1029 using mt_timestamp tags(+0x01 +1b) values(now,+0x01 +1b)
|
||||
sql show tags from st_timestamp_1029
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1029
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1030 using mt_timestamp tags (-0b00 -0a) values(now,-0b00 -0a)
|
||||
sql show tags from st_timestamp_1030
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1030
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1031 using mt_timestamp tags ("-0x00" +1u) values(now,"-0x00" +1u)
|
||||
sql show tags from st_timestamp_1031
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1031
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1032 using mt_timestamp tags (-0x00 +1b) values(now,-0x00 +1b)
|
||||
sql show tags from st_timestamp_1032
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1032
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1033 using mt_timestamp tags(now+1b) values(now,now +1b)
|
||||
sql show tags from st_timestamp_1033
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1033
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1034 using mt_timestamp tags("now" +1b) values(now,"now()" +1b)
|
||||
sql show tags from st_timestamp_1034
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1034
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1035 using mt_timestamp tags(today() + 1d) values(now,today() +1d)
|
||||
sql show tags from st_timestamp_1035
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1035
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql insert into st_timestamp_1036 using mt_timestamp tags("today" +1d) values(now,"today()" +1d)
|
||||
sql show tags from st_timestamp_1036
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql select ts, cast(c as bigint) from st_timestamp_1036
|
||||
if $data01 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
### case 03: alter tag values
|
||||
sql alter table st_timestamp_0 set tag tagname=NULL
|
||||
|
@ -567,12 +838,85 @@ sql show tags from st_timestamp_22
|
|||
if $data05 != -1 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_23 set tag tagname=1+ 1d
|
||||
sql show tags from st_timestamp_23
|
||||
if $data05 != 86400001 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_24 set tag tagname=-0 + 1d
|
||||
sql show tags from st_timestamp_24
|
||||
if $data05 != 86400000 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_25 set tag tagname="-0" -1s
|
||||
sql show tags from st_timestamp_25
|
||||
if $data05 != -1000 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_26 set tag tagname=+0b01 -1a
|
||||
sql show tags from st_timestamp_26
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_27 set tag tagname=0b01 -1s
|
||||
sql show tags from st_timestamp_27
|
||||
if $data05 != -999 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_28 set tag tagname="0x01" +1u
|
||||
sql show tags from st_timestamp_28
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_29 set tag tagname=0x01 +1b
|
||||
sql show tags from st_timestamp_29
|
||||
if $data05 != 1 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_30 set tag tagname==-0b00 -0a
|
||||
sql show tags from st_timestamp_30
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_31 set tag tagname="-0x00" +1u
|
||||
sql show tags from st_timestamp_31
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_32 set tag tagname=-0x00 +1b
|
||||
sql show tags from st_timestamp_32
|
||||
if $data05 != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_33 set tag tagname=now +1b
|
||||
sql show tags from st_timestamp_33
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_34 set tag tagname="now()" +1b
|
||||
sql show tags from st_timestamp_34
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_35 set tag tagname=today( ) +1d
|
||||
sql show tags from st_timestamp_35
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
sql alter table st_timestamp_36 set tag tagname="today()" +1d
|
||||
sql show tags from st_timestamp_36
|
||||
if $data05 < 1711883186000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
## case 04: illegal input
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags (123abc)
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags ("123abc")
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags (abc)
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags ("abc")
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags (now()+1d+1s)
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags (1+1y)
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags (0x01+1b+1a)
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags (" ")
|
||||
sql_error create table st_timestamp_e0 using mt_timestamp tags ('')
|
||||
sql_error create table st_timestamp_104 using mt_timestamp tags ("-123.1")
|
||||
|
@ -590,5 +934,7 @@ sql_error create table st_timestamp_115 using mt_timestamp tags (922337203685477
|
|||
sql create table st_timestamp_116 using mt_timestamp tags (-9223372036854775808)
|
||||
sql_error create table st_timestamp_117 using mt_timestamp tags (-9223372036854775809)
|
||||
sql_error insert into st_timestamp_118 using mt_timestamp tags(9223372036854775807) values(9223372036854775807, 9223372036854775807)
|
||||
sql_error insert into st_timestamp_119 using mt_timestamp tags(1+1s-1s) values(now, now)
|
||||
sql_error insert into st_timestamp_120 using mt_timestamp tags(1-1s) values(now, now-1s+1d)
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -299,6 +299,8 @@ sql_error create table st_varbinary_1012 using mt_varbinary tags(tRue)
|
|||
sql_error create table st_varbinary_1013 using mt_varbinary tags(FalsE)
|
||||
sql_error create table st_varbinary_1014 using mt_varbinary tags(noW)
|
||||
sql_error create table st_varbinary_1015 using mt_varbinary tags(toDay)
|
||||
sql_error create table st_varbinary_1016 using mt_varbinary tags(now()+1s)
|
||||
sql_error create table st_varbinary_1017 using mt_varbinary tags(1+1s)
|
||||
sql_error insert into st_varbinary_106 using mt_varbinary tags(+0123) values(now, NULL);
|
||||
sql_error insert into st_varbinary_107 using mt_varbinary tags(-01.23) values(now, NULL);
|
||||
sql_error insert into st_varbinary_108 using mt_varbinary tags(+0x01) values(now, NULL);
|
||||
|
@ -309,6 +311,8 @@ sql_error insert into st_varbinary_1012 using mt_varbinary tags(tRue) values(no
|
|||
sql_error insert into st_varbinary_1013 using mt_varbinary tags(FalsE) values(now, NULL);
|
||||
sql_error insert into st_varbinary_1014 using mt_varbinary tags(noW) values(now, NULL);
|
||||
sql_error insert into st_varbinary_1015 using mt_varbinary tags(toDay) values(now, NULL);
|
||||
sql_error insert into st_varbinary_1016 using mt_varbinary tags(now()+1s) values(now, NULL);
|
||||
sql_error insert into st_varbinary_1017 using mt_varbinary tags(1+1s) values(now, NULL);
|
||||
sql_error insert into st_varbinary_106 using mt_varbinary tags(NULL) values(now(), +0123)
|
||||
sql_error insert into st_varbinary_107 using mt_varbinary tags(NULL) values(now(), -01.23)
|
||||
sql_error insert into st_varbinary_108 using mt_varbinary tags(NULL) values(now(), +0x01)
|
||||
|
@ -319,5 +323,7 @@ sql_error insert into st_varbinary_1012 using mt_varbinary tags(NULL) values(no
|
|||
sql_error insert into st_varbinary_1013 using mt_varbinary tags(NULL) values(now(), FalsE)
|
||||
sql_error insert into st_varbinary_1014 using mt_varbinary tags(NULL) values(now(), noW)
|
||||
sql_error insert into st_varbinary_1015 using mt_varbinary tags(NULL) values(now(), toDay)
|
||||
sql_error insert into st_varbinary_1016 using mt_varbinary tags(NULL) values(now(), now()+1s)
|
||||
sql_error insert into st_varbinary_1017 using mt_varbinary tags(NULL) values(now(), 1+1s)
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -410,6 +410,17 @@ endi
|
|||
|
||||
|
||||
# case 04: illegal input
|
||||
sql_error create table st_varchar_100 using mt_varchar tags(now+1d)
|
||||
sql_error create table st_varchar_101 using mt_varchar tags(toDay+1d)
|
||||
sql_error create table st_varchar_102 using mt_varchar tags(1+1b)
|
||||
sql_error create table st_varchar_103 using mt_varchar tags(0x01+1d)
|
||||
sql_error create table st_varchar_104 using mt_varchar tags(0b01+1s)
|
||||
sql_error insert into st_varchar_1100 using mt_varchar tags('now') values(now(),now+1d)
|
||||
sql_error insert into st_varchar_1101 using mt_varchar tags('now') values(now(),toDay+1d)
|
||||
sql_error insert into st_varchar_1102 using mt_varchar tags('now') values(now(),1+1b)
|
||||
sql_error insert into st_varchar_1103 using mt_varchar tags('now') values(now(),0x01+1d)
|
||||
sql_error insert into st_varchar_1104 using mt_varchar tags('now') values(now(),0b01+1s)
|
||||
sql_error alter table st_varchar_15 set tag tagname=now()+1d
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -78,14 +78,55 @@ class TDTestCase:
|
|||
tdLog.info(cmd)
|
||||
os.system(cmd)
|
||||
|
||||
def case1(self):
|
||||
|
||||
tdSql.execute(f'create database if not exists d1 vgroups 1')
|
||||
tdSql.execute(f'use d1')
|
||||
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
|
||||
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
|
||||
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
|
||||
|
||||
tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT "
|
||||
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
|
||||
|
||||
tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT "
|
||||
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
|
||||
|
||||
time.sleep(2)
|
||||
tdSql.query("select * from sta")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query("select tbname from sta order by tbname")
|
||||
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'):
|
||||
tdLog.exit("error1")
|
||||
|
||||
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'):
|
||||
tdLog.exit("error2")
|
||||
|
||||
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'):
|
||||
tdLog.exit("error3")
|
||||
|
||||
tdSql.query("select * from stb")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.query("select tbname from stb order by tbname")
|
||||
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'):
|
||||
tdLog.exit("error4")
|
||||
|
||||
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'):
|
||||
tdLog.exit("error5")
|
||||
|
||||
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'):
|
||||
tdLog.exit("error6")
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
self.case1()
|
||||
# gen data
|
||||
random.seed(int(time.time()))
|
||||
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
|
||||
# create stream
|
||||
tdSql.execute("use db")
|
||||
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
|
||||
tdSql.execute("create stream stream3 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
|
||||
sql = "select count(*) from sta"
|
||||
# loop wait max 60s to check count is ok
|
||||
tdLog.info("loop wait result ...")
|
||||
|
|
|
@ -18,7 +18,7 @@ IF (TD_WEBSOCKET)
|
|||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND cargo update
|
||||
COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features native-tls
|
||||
COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features rustls
|
||||
INSTALL_COMMAND
|
||||
COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
|
||||
|
@ -37,7 +37,7 @@ IF (TD_WEBSOCKET)
|
|||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND cargo update
|
||||
COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored
|
||||
COMMAND cargo build --release -p taos-ws-sys --features rustls
|
||||
INSTALL_COMMAND
|
||||
COMMAND cp target/release/taosws.dll ${CMAKE_BINARY_DIR}/build/lib
|
||||
COMMAND cp target/release/taosws.dll.lib ${CMAKE_BINARY_DIR}/build/lib/taosws.lib
|
||||
|
@ -57,7 +57,7 @@ IF (TD_WEBSOCKET)
|
|||
COMMAND git clean -f -d
|
||||
BUILD_COMMAND
|
||||
COMMAND cargo update
|
||||
COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored
|
||||
COMMAND cargo build --release -p taos-ws-sys --features rustls
|
||||
INSTALL_COMMAND
|
||||
COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib
|
||||
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
|
||||
|
|
Loading…
Reference in New Issue