Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0

This commit is contained in:
54liuyao 2024-04-03 09:54:10 +08:00
commit 537395e845
23 changed files with 506 additions and 299 deletions

View File

@ -0,0 +1,66 @@
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let dsn = "taos://localhost:6030";
let builder = TaosBuilder::from_dsn(dsn)?;
let taos = builder.build()?;
// ANCHOR: create_db_and_table
let db = "power";
// create database
taos.exec_many([
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("USE `{db}`"),
])
.await?;
// create table
taos.exec_many([
// create super table
"CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
TAGS (`groupid` INT, `location` BINARY(24))",
]).await?;
// ANCHOR_END: create_db_and_table
// ANCHOR: insert_data
let inserted = taos.exec("INSERT INTO " +
"power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 219, 0.31000) " +
"(NOW + 2a, 12.60000, 218, 0.33000) " +
"(NOW + 3a, 12.30000, 221, 0.31000) " +
"power.d1002 USING power.meters TAGS(3, 'California.SanFrancisco') " +
"VALUES " +
"(NOW + 1a, 10.30000, 218, 0.25000) ").await?;
println!("inserted: {} rows", inserted);
// ANCHOR_END: insert_data
// ANCHOR: query_data
let mut result = taos.query("SELECT * FROM power.meters").await?;
for field in result.fields() {
println!("got field: {}", field.name());
}
let mut rows = result.rows();
let mut nrows = 0;
while let Some(row) = rows.try_next().await? {
for (col, (name, value)) in row.enumerate() {
println!(
"[{}] got value in col {} (named `{:>8}`): {}",
nrows, col, name, value
);
}
nrows += 1;
}
// ANCHOR_END: query_data
// ANCHOR: query_with_req_id
let result = taos.query_with_req_id("SELECT * FROM power.meters", 0).await?;
// ANCHOR_END: query_with_req_id
}

View File

@ -0,0 +1,80 @@
use taos_query::common::SchemalessPrecision;
use taos_query::common::SchemalessProtocol;
use taos_query::common::SmlDataBuilder;
use crate::AsyncQueryable;
use crate::AsyncTBuilder;
use crate::TaosBuilder;
async fn put() -> anyhow::Result<()> {
std::env::set_var("RUST_LOG", "taos=debug");
pretty_env_logger::init();
let dsn =
std::env::var("TDENGINE_ClOUD_DSN").unwrap_or("http://localhost:6041".to_string());
log::debug!("dsn: {:?}", &dsn);
let client = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
client.exec(format!("drop database if exists {db}")).await?;
client
.exec(format!("create database if not exists {db}"))
.await?;
// should specify database before insert
client.exec(format!("use {db}")).await?;
// SchemalessProtocol::Line
let data = [
"meters,groupid=2,location=California.SanFrancisco current=10.3000002f64,voltage=219i32,phase=0.31f64 1626006833639000000",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Line)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(100u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
// SchemalessProtocol::Telnet
let data = [
"meters.current 1648432611249 10.3 location=California.SanFrancisco group=2",
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Telnet)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(200u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
// SchemalessProtocol::Json
let data = [
r#"[{"metric": "meters.current", "timestamp": 1681345954000, "value": 10.3, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611249, "value": 219, "tags": {"location": "California.LosAngeles", "groupid": 1}}, {"metric": "meters.current", "timestamp": 1648432611250, "value": 12.6, "tags": {"location": "California.SanFrancisco", "groupid": 2}}, {"metric": "meters.voltage", "timestamp": 1648432611250, "value": 221, "tags": {"location": "California.LosAngeles", "groupid": 1}}]"#
]
.map(String::from)
.to_vec();
let sml_data = SmlDataBuilder::default()
.protocol(SchemalessProtocol::Json)
.precision(SchemalessPrecision::Millisecond)
.data(data.clone())
.ttl(1000)
.req_id(300u64)
.build()?;
assert_eq!(client.put(&sml_data).await?, ());
client.exec(format!("drop database if exists {db}")).await?;
Ok(())
}

View File

@ -0,0 +1,37 @@
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let taos = TaosBuilder::from_dsn("taos://")?.build().await?;
taos.exec("DROP DATABASE IF EXISTS power").await?;
taos.create_database("power").await?;
taos.use_database("power").await?;
taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
let mut stmt = Stmt::init(&taos).await?;
stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)").await?;
const NUM_TABLES: usize = 10;
const NUM_ROWS: usize = 10;
for i in 0..NUM_TABLES {
let table_name = format!("d{}", i);
let tags = vec![Value::VarChar("California.SanFransico".into()), Value::Int(2)];
stmt.set_tbname_tags(&table_name, &tags).await?;
for j in 0..NUM_ROWS {
let values = vec![
ColumnView::from_millis_timestamp(vec![1648432611249 + j as i64]),
ColumnView::from_floats(vec![10.3 + j as f32]),
ColumnView::from_ints(vec![219 + j as i32]),
ColumnView::from_floats(vec![0.31 + j as f32]),
];
stmt.bind(&values).await?;
}
stmt.add_batch().await?;
}
// execute.
let rows = stmt.execute().await?;
assert_eq!(rows, NUM_TABLES * NUM_ROWS);
Ok(())
}

View File

@ -0,0 +1,166 @@
use std::time::Duration;
use std::str::FromStr;
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::formatted_timed_builder()
.filter_level(log::LevelFilter::Info)
.init();
use taos_query::prelude::*;
let dsn = "taos://localhost:6030".to_string();
log::info!("dsn: {}", dsn);
let mut dsn = Dsn::from_str(&dsn)?;
let taos = TaosBuilder::from_dsn(&dsn)?.build().await?;
// prepare database and table
taos.exec_many([
"drop topic if exists topic_meters",
"drop database if exists power",
"create database if not exists power WAL_RETENTION_PERIOD 86400",
"use power",
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (groupId INT, location BINARY(24))",
"create table if not exists power.d001 using power.meters tags(1,'location')",
])
.await?;
taos.exec_many([
"drop database if exists db2",
"create database if not exists db2 wal_retention_period 3600",
"use db2",
])
.await?;
// ANCHOR: create_topic
taos.exec_many([
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM power.meters",
])
.await?;
// ANCHOR_END: create_topic
// ANCHOR: create_consumer
dsn.params.insert("group.id".to_string(), "abc".to_string());
dsn.params.insert("auto.offset.reset".to_string(), "earliest".to_string());
let builder = TmqBuilder::from_dsn(&dsn)?;
let mut consumer = builder.build().await?;
// ANCHOR_END: create_consumer
// ANCHOR: subscribe
consumer.subscribe(["topic_meters"]).await?;
// ANCHOR_END: subscribe
// ANCHOR: consume
{
let mut stream = consumer.stream_with_timeout(Timeout::from_secs(1));
while let Some((offset, message)) = stream.try_next().await? {
let topic: &str = offset.topic();
let database = offset.database();
let vgroup_id = offset.vgroup_id();
log::debug!(
"topic: {}, database: {}, vgroup_id: {}",
topic,
database,
vgroup_id
);
match message {
MessageSet::Meta(meta) => {
log::info!("Meta");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
}
MessageSet::Data(data) => {
log::info!("Data");
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
MessageSet::MetaData(meta, data) => {
log::info!("MetaData");
let raw = meta.as_raw_meta().await?;
taos.write_raw_meta(&raw).await?;
let json = meta.as_json_meta().await?;
let sql = json.to_string();
if let Err(err) = taos.exec(sql).await {
println!("maybe error: {}", err);
}
while let Some(data) = data.fetch_raw_block().await? {
log::debug!("data: {:?}", data);
}
}
}
consumer.commit(offset).await?;
}
}
// ANCHOR_END: consume
// ANCHOR: assignments
let assignments = consumer.assignments().await.unwrap();
log::info!("assignments: {:?}", assignments);
// ANCHOR_END: assignments
// seek offset
for topic_vec_assignment in assignments {
let topic = &topic_vec_assignment.0;
let vec_assignment = topic_vec_assignment.1;
for assignment in vec_assignment {
let vgroup_id = assignment.vgroup_id();
let current = assignment.current_offset();
let begin = assignment.begin();
let end = assignment.end();
log::debug!(
"topic: {}, vgroup_id: {}, current offset: {} begin {}, end: {}",
topic,
vgroup_id,
current,
begin,
end
);
// ANCHOR: seek_offset
let res = consumer.offset_seek(topic, vgroup_id, end).await;
if res.is_err() {
log::error!("seek offset error: {:?}", res);
let a = consumer.assignments().await.unwrap();
log::error!("assignments: {:?}", a);
}
// ANCHOR_END: seek_offset
}
let topic_assignment = consumer.topic_assignment(topic).await;
log::debug!("topic assignment: {:?}", topic_assignment);
}
// after seek offset
let assignments = consumer.assignments().await.unwrap();
log::info!("after seek offset assignments: {:?}", assignments);
// ANCHOR: unsubscribe
consumer.unsubscribe().await;
// ANCHOR_END: unsubscribe
tokio::time::sleep(Duration::from_secs(1)).await;
taos.exec_many([
"drop database db2",
"drop topic topic_meters",
"drop database power",
])
.await?;
Ok(())
}

View File

@ -85,6 +85,7 @@ typedef struct SColumnNode {
char colName[TSDB_COL_NAME_LEN];
int16_t dataBlockId;
int16_t slotId;
int16_t numOfPKs;
bool tableHasPk;
bool isPk;
} SColumnNode;

View File

@ -76,6 +76,7 @@ typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
col_id_t numOfColumns; // the number of columns
int16_t numOfPKs;
int32_t rowSize; // row size of the schema
} STableComInfo;

View File

@ -766,6 +766,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_SECOND_COL_PK TAOS_DEF_ERROR_CODE(0, 0x2672)
#define TSDB_CODE_PAR_COL_PK_TYPE TAOS_DEF_ERROR_CODE(0, 0x2673)
#define TSDB_CODE_PAR_INVALID_PK_OP TAOS_DEF_ERROR_CODE(0, 0x2674)
#define TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL TAOS_DEF_ERROR_CODE(0, 0x2675)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -1061,7 +1061,6 @@ _exit:
static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag) {
int32_t code = 0;
SKVIdx *pKVIdx = (SKVIdx *)pRow->data;
uint8_t *pv = NULL;
int32_t iColData = 0;
SColData *pColData = &aColData[iColData];
@ -1069,6 +1068,14 @@ static int32_t tRowKVUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aCo
STColumn *pTColumn = &pTSchema->columns[iTColumn];
int32_t iCol = 0;
// primary keys
uint8_t *data = pRow->data;
SPrimaryKeyIndex index;
for (int32_t i = 0; i < pRow->numOfPKs; i++) {
data += tGetPrimaryKeyIndex(data, &index);
}
SKVIdx *pKVIdx = (SKVIdx *)data;
if (pRow->flag & KV_FLG_LIT) {
pv = pKVIdx->idx + pKVIdx->nCol;
} else if (pRow->flag & KV_FLG_MID) {

View File

@ -292,16 +292,16 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
if (tlen <= 0){
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_TMQ_INVALID_MSG;
}
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){
if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
rpcFreeCont(buf);
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_TMQ_INVALID_MSG;
}
pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen;
@ -316,7 +316,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = NULL;
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}

View File

@ -107,7 +107,21 @@ int32_t pkCompEx(__compar_fn_t comparFn, SRowKey* p1, SRowKey* p2) {
if (p1->numOfPKs == 0) {
return 0;
} else {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
if (IS_VAR_DATA_TYPE(p1->pks[0].type)) {
int32_t len = TMIN(p1->pks[0].nData, p2->pks[0].nData);
int32_t ret = strncmp((char*)p1->pks[0].pData, (char*)p2->pks[0].pData, len);
if (ret == 0) {
if (p1->pks[0].nData == p2->pks[0].nData) {
return 0;
} else {
return p1->pks[0].nData > p2->pks[0].nData?1:-1;
}
} else {
return ret > 0? 1:-1;
}
} else {
return comparFn(&p1->pks[0].val, &p2->pks[0].val);
}
}
}

View File

@ -217,6 +217,11 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type);
if (colDataIsNull_s(pColInfoData, j)) {
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
taosArrayPush(pVals, &cv);
} else {
@ -240,10 +245,15 @@ int32_t buildSubmitReqFromBlock(SDataInserterHandle* pInserter, SSubmitReq2** pp
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
if (colDataIsNull_s(pColInfoData, j)) {
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
qError("NULL value for primary key");
qError("Primary timestamp column should not be null");
terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL;
goto _end;
}
if ((pCol->flags & COL_IS_KEY)) {
qError("Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, pCol->colId, pCol->type);
terrno = TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL;
goto _end;
}
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type
taosArrayPush(pVals, &cv);

View File

@ -359,8 +359,8 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
}
static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex,
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
STimeWindow* win) {
int32_t nextRowIndex, SArray* pDataBlock, const TSKEY* tsCols,
TSKEY blockEkey, STimeWindow* win) {
int32_t order = pInfo->binfo.inputTsOrder;
TSKEY actualEndKey = tsCols[endRowIndex];
@ -378,7 +378,6 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx
return true;
}
int32_t nextRowIndex = endRowIndex + 1;
ASSERT(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex];
@ -496,7 +495,6 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
ASSERT(pBlock != NULL);
if (pBlock->pDataBlock == NULL) {
// tscError("pBlock->pDataBlock == NULL");
return;
}
@ -514,17 +512,26 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
}
// point interpolation does not require the end key time window interpolation.
// if (pointInterpQuery) {
// return;
// }
// interpolation query does not generate the time window end interpolation
done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + forwardRows - 1;
int32_t nextRowIndex = endRowIndex + 1;
// duplicated ts row does not involve in the interpolation of end value for current time window
int32_t x = endRowIndex;
while(x >= 0) {
if (tsCols[x] == tsCols[x-1]) {
x -= 1;
} else {
endRowIndex = x;
break;
}
}
TSKEY endKey = (pInfo->binfo.inputTsOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, nextRowIndex, pBlock->pDataBlock, tsCols, endKey, win);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}

View File

@ -533,6 +533,24 @@ bool funcInputGetNextRowDescPk(SFuncInputRowIter* pIter, SFuncInputRow* pRow) {
}
}
static void forwardToNextDiffTsRow(SFuncInputRowIter* pIter, int32_t rowIndex) {
int32_t idx = rowIndex + 1;
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[rowIndex]) {
++idx;
}
pIter->rowIndex = idx;
}
static void setInputRowInfo(SFuncInputRow* pRow, SFuncInputRowIter* pIter, int32_t rowIndex, bool setPk) {
pRow->ts = pIter->tsList[rowIndex];
pRow->ts = pIter->tsList[rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, rowIndex);
pRow->pPk = setPk? colDataGetData(pIter->pPkCol, rowIndex):NULL;
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = rowIndex;
}
bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->hasPrev) {
if (pIter->prevBlockTsEnd == pIter->tsList[pIter->inputEndIndex]) {
@ -543,33 +561,19 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
while (pIter->tsList[idx] == pIter->prevBlockTsEnd) {
++idx;
}
pRow->ts = pIter->tsList[idx];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, idx);
pRow->pData = colDataGetData(pIter->pDataCol, idx);
pRow->pPk = colDataGetData(pIter->pPkCol, idx);
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = idx;
pIter->hasPrev = false;
pIter->rowIndex = idx + 1;
setInputRowInfo(pRow, pIter, idx, true);
forwardToNextDiffTsRow(pIter, idx);
return true;
}
} else {
if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = colDataGetData(pIter->pPkCol, pIter->rowIndex);
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex;
setInputRowInfo(pRow, pIter, pIter->rowIndex, true);
TSKEY tsEnd = pIter->tsList[pIter->inputEndIndex];
if (pIter->tsList[pIter->rowIndex] != tsEnd) {
int32_t idx = pIter->rowIndex + 1;
while (idx <= pIter->inputEndIndex && pIter->tsList[idx] == pIter->tsList[pIter->rowIndex]) {
++idx;
}
pIter->rowIndex = idx;
forwardToNextDiffTsRow(pIter, pIter->rowIndex);
} else {
pIter->rowIndex = pIter->inputEndIndex + 1;
}
@ -585,13 +589,7 @@ bool funcInputGetNextRowAscPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
bool funcInputGetNextRowNoPk(SFuncInputRowIter *pIter, SFuncInputRow* pRow) {
if (pIter->rowIndex <= pIter->inputEndIndex) {
pRow->ts = pIter->tsList[pIter->rowIndex];
pRow->isDataNull = colDataIsNull_f(pIter->pDataCol->nullbitmap, pIter->rowIndex);
pRow->pData = colDataGetData(pIter->pDataCol, pIter->rowIndex);
pRow->pPk = NULL;
pRow->block = pIter->pSrcBlock;
pRow->rowIndex = pIter->rowIndex;
setInputRowInfo(pRow, pIter, pIter->rowIndex, false);
++pIter->rowIndex;
return true;
} else {
@ -2821,11 +2819,13 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
pOutput->isNull = pInput->isNull;
pOutput->ts = pInput->ts;
pOutput->bytes = pInput->bytes;
pOutput->pkType = pInput->pkType;
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
if (pInput->pkData) {
pOutput->pkBytes = pInput->pkBytes;
memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes);
pOutput->pkData = pOutput->buf + pOutput->bytes;
}
return TSDB_CODE_SUCCESS;
}
@ -5580,8 +5580,6 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STwaInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);

View File

@ -183,6 +183,8 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
pBoundInfo->numOfBound = 0;
bool hasPK = pTableMeta->tableInfo.numOfPKs;
int16_t numOfBoundPKs = 0;
int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
while (TSDB_CODE_SUCCESS == code) {
@ -219,14 +221,20 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
if (hasPK && (pSchema[index].flags & COL_IS_KEY)) ++numOfBoundPKs;
}
}
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType) && !pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "primary timestamp column can not be null");
if (TSDB_CODE_SUCCESS == code && (BOUND_TAGS != boundColsType)) {
if (!pUseCols[0]) {
code = buildInvalidOperationMsg(&pCxt->msg, "Primary timestamp column should not be null");
}
if (numOfBoundPKs != pTableMeta->tableInfo.numOfPKs) {
code = buildInvalidOperationMsg(&pCxt->msg, "Primary key column should not be none");
}
}
if (TSDB_CODE_SUCCESS == code && (BOUND_ALL_AND_TBNAME == boundColsType) && !pUseCols[tbnameSchemaIndex]) {
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column can not be null");
code = buildInvalidOperationMsg(&pCxt->msg, "tbname column should not be null");
}
taosMemoryFree(pUseCols);
@ -469,13 +477,15 @@ static int32_t parseTagToken(const char** end, SToken* pToken, SSchema* pSchema,
char* endptr = NULL;
int32_t code = TSDB_CODE_SUCCESS;
#if 0
if (isNullValue(pSchema->type, pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(pMsgBuf, "primary timestamp should not be null", pToken->z);
return buildSyntaxErrMsg(pMsgBuf, "Primary timestamp column can not be null", pToken->z);
}
return TSDB_CODE_SUCCESS;
}
#endif
// strcpy(val->colName, pSchema->name);
val->cid = pSchema->colId;
@ -1643,7 +1653,10 @@ static int32_t parseValueToken(SInsertParseContext* pCxt, const char** pSql, STo
int32_t code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg, pSchema->type);
if (TSDB_CODE_SUCCESS == code && isNullValue(pSchema->type, pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
return buildSyntaxErrMsg(&pCxt->msg, "primary timestamp should not be null", pToken->z);
return buildSyntaxErrMsg(&pCxt->msg, "Primary timestamp column should not be null", pToken->z);
}
if (pSchema->flags & COL_IS_KEY) {
return buildSyntaxErrMsg(&pCxt->msg, "Primary key column should not be null", pToken->z);
}
pVal->flag = CV_FLAG_NULL;

View File

@ -267,6 +267,11 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
pBind = bind + c;
}
if(pBind->is_null && (pColSchema->flags & COL_IS_KEY)){
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
code = tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
if (code) {
goto _return;
@ -313,7 +318,12 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
pBind = bind;
}
tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE: -1);
if (pBind->is_null && (pColSchema->flags & COL_IS_KEY)) {
code = buildInvalidOperationMsg(&pBuf, "Primary key column should not be null");
goto _return;
}
tColDataAddValueByBind(pCol, pBind, IS_VAR_DATA_TYPE(pColSchema->type) ? pColSchema->bytes - VARSTR_HEADER_SIZE : -1);
qDebug("stmt col %d bind %d rows data", colIdx, rowNum);

View File

@ -955,6 +955,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
}
pCol->tableHasPk = hasPkInTable(pTable->pMeta);
pCol->isPk = (pCol->tableHasPk) && (pColSchema->flags & COL_IS_KEY);
pCol->numOfPKs = pTable->pMeta->tableInfo.numOfPKs;
}
static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) {
@ -5089,9 +5090,11 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns");
}
SNode* pPrimaryKeyExpr = NULL;
SNode* pBoundCol = NULL;
SNode* pProj = NULL;
SNode* pPrimaryKeyExpr = NULL;
SNode* pBoundCol = NULL;
SNode* pProj = NULL;
int16_t numOfTargetPKs = 0;
int16_t numOfBoundPKs = 0;
FORBOTH(pBoundCol, pInsert->pCols, pProj, pProjects) {
SColumnNode* pCol = (SColumnNode*)pBoundCol;
SExprNode* pExpr = (SExprNode*)pProj;
@ -5107,12 +5110,18 @@ static int32_t translateInsertProject(STranslateContext* pCxt, SInsertStmt* pIns
snprintf(pExpr->aliasName, sizeof(pExpr->aliasName), "%s", pCol->colName);
if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
pPrimaryKeyExpr = (SNode*)pExpr;
numOfTargetPKs = pCol->numOfPKs;
}
if (pCol->isPk) ++numOfBoundPKs;
}
if (NULL == pPrimaryKeyExpr) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMNS_NUM,
"Primary timestamp column can not be null");
"Primary timestamp column should not be null");
}
if (numOfBoundPKs != numOfTargetPKs) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "Primary key column should not be none");
}
return addOrderByPrimaryKeyToQuery(pCxt, pPrimaryKeyExpr, pInsert->pQuery);

View File

@ -294,6 +294,7 @@ static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema, const STa
pCol->colType = COLUMN_TYPE_COLUMN;
pCol->isPk = pSchema->flags & COL_IS_KEY;
pCol->tableHasPk = hasPkInTable(pMeta);
pCol->numOfPKs = pMeta->tableInfo.numOfPKs;
strcpy(pCol->colName, pSchema->name);
return (SNode*)pCol;
}

View File

@ -421,8 +421,16 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta *
memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY);
for (int32_t i = 0; i < msg->numOfColumns; ++i) {
pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
if (hasPK && (i > 0)) {
if ((pTableMeta->schema[i].flags & COL_IS_KEY)) {
++pTableMeta->tableInfo.numOfPKs;
} else {
hasPK = false;
}
}
}
qDebug("table %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s stb %s suid %" PRIx64 " sver %d tver %d"

View File

@ -278,6 +278,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.numOfNotReady = 0;
pTask->chkInfo.transId = 0;
pTask->chkInfo.dispatchCheckpointTrigger = false;
pTask->chkInfo.downstreamAlignNum = 0;
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {

View File

@ -628,6 +628,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TAG_IS_PRIMARY_KEY, "tag can not be prim
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SECOND_COL_PK, "primary key must be second column")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_PK_TYPE, "primary key column must be of type int, uint, bigint, ubigint, and varchar")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PK_OP, "primary key column can not be added, modified, and dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PRIMARY_KEY_IS_NULL, "Primary key column should not be null")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")
//planner

View File

@ -96,9 +96,28 @@ class TDTestCase:
time.sleep(2)
tdSql.query("select * from sta")
tdSql.checkRows(3)
tdSql.query("select tbname from sta order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'):
tdLog.exit("error1")
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'):
tdLog.exit("error2")
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'):
tdLog.exit("error3")
tdSql.query("select * from stb")
tdSql.checkRows(3)
tdSql.query("select tbname from stb order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'):
tdLog.exit("error4")
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'):
tdLog.exit("error5")
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'):
tdLog.exit("error6")
# run
def run(self):
self.case1()

View File

@ -1,243 +0,0 @@
# 写一段python代码生成一个JSON串json 串为数组数组长度为10000每个元素为包含4000个key-value对的JSON字符串json 数组里每个元素里的4000个key不相同元素之间使用相同的keykey值为英文单词value 为int值且value 的范围是[0, 256]。把json串紧凑形式写入文件把json串存入parquet文件中把json串写入avro文件中,把json串写入到postgre sql表中表有两列第一列主int类型主键第二列为json类型数组的每个元素写入json类型里
import csv
import json
import os
import random
import string
import time
from faker import Faker
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro
import psycopg2
from psycopg2.extras import Json
def get_dir_size(start_path='.'):
total = 0
for dirpath, dirs, files in os.walk(start_path):
for f in files:
fp = os.path.join(dirpath, f)
# 获取文件大小并累加到total上
total += os.path.getsize(fp)
return total
def to_avro_record(obj):
return {key: value for key, value in obj.items()}
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_random_values(t):
if t == 0:
return random.randint(-255, 256)
elif t == 1:
return random.randint(-2100000000, 2100000000)
elif t == 2:
return random.uniform(-10000.0, 10000.0)
elif t == 3:
return generate_random_string(10)
elif t == 4:
return random.choice([True, False])
def generate_json_object(key_set, value_set):
values = [generate_random_values(t) for t in value_set]
return dict(zip(key_set, values))
def generate_json_array(keys, values, array_length):
return [generate_json_object(keys, values) for _ in range(array_length)]
def write_parquet_file(parquet_file, json_array):
df = pd.DataFrame(json_array)
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_file + ".parquet")
def write_json_file(json_file, json_array):
with open(json_file + ".json", 'w') as f:
json.dump(json_array, f, separators=(',', ':'))
def generate_avro_schema(k, t):
if t == 0:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 1:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 2:
return {"name": k, "type": "float"}
elif t == 3:
return {"name": k, "type": "string"}
elif t == 4:
return {"name": k, "type": "boolean"}
def write_avro_file(avro_file, json_array, keys, values):
k = list(json_array[0].keys())
if keys != k:
raise ValueError("keys and values should have the same length")
avro_schema = {
"type": "record",
"name": "MyRecord",
"fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()]
}
avro_records = [to_avro_record(obj) for obj in json_array]
with open(avro_file + ".avro", 'wb') as f:
fastavro.writer(f, avro_schema, avro_records)
def write_pg_file(json_array):
conn_str = "dbname=mydatabase user=myuser host=localhost"
conn = psycopg2.connect(conn_str)
cur = conn.cursor()
cur.execute("drop table if exists my_table")
conn.commit()
# 创建表(如果不存在)
cur.execute("""
CREATE TABLE IF NOT EXISTS my_table (
id SERIAL PRIMARY KEY,
json_data JSONB
);
""")
conn.commit()
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows before:", row[0])
# 插入数据
for idx, json_obj in enumerate(json_array):
# print(json.dumps(json_obj))
cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),))
conn.commit() # 提交事务
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows after:", row[0])
# # 执行SQL查询
# cur.execute("SELECT pg_relation_size('my_table')")
# # 获取查询结果
# rows = cur.fetchall()
# # 打印查询结果
# size = 0
# for row in rows:
# size = row[0]
# print("table size:", row[0])
# 关闭游标和连接
cur.close()
conn.close()
def read_parquet_file(parquet_file):
table = pq.read_table(parquet_file + ".parquet")
df = table.to_pandas()
print(df)
def read_avro_file(avg_file):
with open(avg_file + ".avro", 'rb') as f:
reader = fastavro.reader(f)
for record in reader:
print(record)
def read_json_file(csv_file):
with open(csv_file + ".json", 'r') as f:
data = json.load(f)
print(data)
def main():
key_length = 7
key_sizes = 4000
row_sizes = 10000
file_name = "output"
# cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)]
cases = [(2, 2), (3, 3), (0, 4)]
for data in cases:
begin, end = data
print(f"执行类型:{begin}-{end}")
N = 2
for _ in range(N):
t0 = time.time()
keys = [generate_random_string(key_length) for _ in range(key_sizes)]
values = [random.randint(begin, end) for _ in range(key_sizes)]
# 生成JSON数组
json_array = generate_json_array(keys, values, row_sizes)
t1 = time.time()
write_json_file(file_name, json_array)
t2 = time.time()
write_parquet_file(file_name, json_array)
t3 = time.time()
write_avro_file(file_name, json_array, keys, values)
t4 = time.time()
size = write_pg_file(json_array)
t5 = time.time()
print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json"))
print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet"))
print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro"))
print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024)
# read_json_file(file_name)
# read_parquet_file(file_name)
# read_avro_file(file_name)
print(f"\n---------------\n")
if __name__ == "__main__":
main()
# 压缩文件
# import os
#
# import lz4.frame
#
#
# files =["output.json", "output.parquet", "output.avro"]
# def compress_file(input_path, output_path):
# with open(input_path, 'rb') as f_in:
# compressed_data = lz4.frame.compress(f_in.read())
#
# with open(output_path, 'wb') as f_out:
# f_out.write(compressed_data)
#
# for file in files:
# compress_file(file, file + ".lz4")
# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4"))

View File

@ -18,7 +18,7 @@ IF (TD_WEBSOCKET)
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cargo update
COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features native-tls
COMMAND RUSTFLAGS=-Ctarget-feature=-crt-static cargo build --release -p taos-ws-sys --features rustls
INSTALL_COMMAND
COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include
@ -37,7 +37,7 @@ IF (TD_WEBSOCKET)
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cargo update
COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored
COMMAND cargo build --release -p taos-ws-sys --features rustls
INSTALL_COMMAND
COMMAND cp target/release/taosws.dll ${CMAKE_BINARY_DIR}/build/lib
COMMAND cp target/release/taosws.dll.lib ${CMAKE_BINARY_DIR}/build/lib/taosws.lib
@ -57,7 +57,7 @@ IF (TD_WEBSOCKET)
COMMAND git clean -f -d
BUILD_COMMAND
COMMAND cargo update
COMMAND cargo build --release -p taos-ws-sys --features native-tls-vendored
COMMAND cargo build --release -p taos-ws-sys --features rustls
INSTALL_COMMAND
COMMAND cp target/release/${websocket_lib_file} ${CMAKE_BINARY_DIR}/build/lib
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/build/include