fix: remove tdengine case to python-connector repo

This commit is contained in:
Alex Duan 2024-12-30 15:25:25 +08:00
parent e6ec05e155
commit f0627fd7e8
3 changed files with 4 additions and 632 deletions

View File

@ -1,626 +0,0 @@
# encoding:UTF-8
from ctypes import *
from datetime import datetime
# geometry support
from shapely.wkb import dumps, loads
from shapely.wkt import dumps as wkt_dumps, loads as wkt_loads
import taos
import math
import traceback
from taos.statement2 import *
from taos.constants import FieldType
from taos import log
from taos import bind2
# input WKT return WKB (bytes object)
def WKB(wkt, hex = False):
if wkt is None:
return None
wkb = wkt_loads(wkt)
wkb_bytes = dumps(wkb, hex)
return wkb_bytes
def compareLine(oris, rows):
n = len(oris)
if len(rows) != n:
return False
log.debug(f" len is {n} oris={oris} rows={rows}")
for i in range(n):
if oris[i] != rows[i]:
if type(rows[i]) == bool:
if bool(oris[i]) != rows[i]:
log.debug1(f" diff bool i={i} oris[i] == rows[i] {oris[i]} == {rows[i]}")
return False
else:
log.debug1(f" float i={i} oris[i] == rows[i] {oris[i]} == {rows[i]}")
elif type(rows[i]) == float:
if math.isclose(oris[i], rows[i], rel_tol=1e-3) is False:
log.debug1(f" diff float i={i} oris[i] == rows[i] {oris[i]} == {rows[i]}")
return False
else:
log.debug1(f" float i={i} oris[i] == rows[i] {oris[i]} == {rows[i]}")
else:
log.debug1(f" diff i={i} oris[i] == rows[i] {oris[i]} == {rows[i]}")
return False
else:
log.debug1(f" i={i} oris[i] == rows[i] {oris[i]} == {rows[i]}")
return True
def checkResultCorrect(conn, sql, tagsTb, datasTb):
# column to rows
log.debug(f"check sql correct: {sql}\n")
oris = []
ncol = len(datasTb)
nrow = len(datasTb[0])
for i in range(nrow):
row = []
for j in range(ncol):
if j == 0:
# ts column
c0 = datasTb[j][i]
if type(c0) is int :
row.append(datasTb[j][i])
else:
ts = int(bind2.datetime_to_timestamp(c0, PrecisionEnum.Milliseconds).value)
row.append(ts)
else:
row.append(datasTb[j][i])
if tagsTb is not None:
row += tagsTb
oris.append(row)
# fetch all
lres = []
log.debug(sql)
res = conn.query(sql)
i = 0
for row in res:
lrow = list(row)
lrow[0] = int(lrow[0].timestamp()*1000)
if compareLine(oris[i], lrow) is False:
log.info(f"insert data differet. i={i} expect ori data={oris[i]} query from db ={lrow}")
raise(BaseException("check insert data correct failed."))
else:
log.debug(f"i={i} origin data same with get from db\n")
log.debug(f" origin data = {oris[i]} \n")
log.debug(f" get from db = {lrow} \n")
i += 1
def checkResultCorrects(conn, dbname, stbname, tbnames, tags, datas):
count = len(tbnames)
for i in range(count):
if stbname is None:
sql = f"select * from {dbname}.{tbnames[i]} "
else:
sql = f"select * from {dbname}.{stbname} where tbname='{tbnames[i]}' "
checkResultCorrect(conn, sql, tags[i], datas[i])
print("insert data check correct ..................... ok\n")
def prepare(conn, dbname, stbname, ntb1, ntb2):
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s precision 'ms' " % dbname)
conn.select_db(dbname)
# stable
sql = f"create table if not exists {dbname}.{stbname}(ts timestamp, name binary(32), sex bool, score int) tags(grade nchar(8), class int)"
conn.execute(sql)
# normal table
sql = f"create table if not exists {dbname}.{ntb1} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128))"
conn.execute(sql)
sql = f"create table if not exists {dbname}.{ntb2} (ts timestamp, name varbinary(32), sex bool, score float, geo geometry(128))"
conn.execute(sql)
# performace is high
def insert_bind_param(conn, stmt2, dbname, stbname):
#
# table info , write 5 lines to 3 child tables d0, d1, d2 with super table
#
tbnames = ["d1","d2","d3"]
tags = [
["grade1", 1],
["grade1", None],
[None , 3]
]
datas = [
# class 1
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004,1601481600005],
["Mary", "Tom", "Jack", "Jane", "alex" ,None ],
[0, 1, 1, 0, 1 ,None ],
[98, 80, 60, 100, 99 ,None ]
],
# class 2
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004,1601481600005],
["Mary2", "Tom2", "Jack2", "Jane2", "alex2" ,None ],
[0, 1, 1, 0, 1 ,0 ],
[298, 280, 260, 2100, 299 ,None ]
],
# class 3
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004,1601481600005],
["Mary3", "Tom3", "Jack3", "Jane3", "alex3" ,"Mark" ],
[0, 1, 1, 0, 1 ,None ],
[398, 380, 360, 3100, 399 ,None ]
]
]
stmt2.bind_param(tbnames, tags, datas)
stmt2.execute()
# check correct
checkResultCorrects(conn, dbname, stbname, tbnames, tags, datas)
def insert_bind_param_normal_tables(conn, stmt2, dbname, ntb):
tbnames = [ntb]
tags = None
wkts = [None, b"POINT(121.213 31.234)", b"POINT(122.22 32.222)", None, b"POINT(124.22 34.222)"]
wkbs = [WKB(wkt) for wkt in wkts]
datas = [
# table 1
[
# student
[1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)],
[b"Mary", b"tom", b"Jack", b"Jane", None ],
[0, 3.14, True, 0, 1 ],
[98, 99.87, 60, 100, 99 ],
wkbs
]
]
stmt2.bind_param(tbnames, tags, datas)
stmt2.execute()
# check correct
checkResultCorrects(conn, dbname, None, tbnames, [None], datas)
def insert_bind_param_with_table(conn, stmt2, dbname, stbname, ctb):
tbnames = None
tags = [
["grade2", 1]
]
# prepare data
datas = [
# table 1
[
# student
[1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)],
["Mary", "Tom", "Jack", "Jane", "alex" ],
[0, 1, 1, 0, 1 ],
[98, 80, 60, 100, 99 ]
]
]
stmt2.bind_param(tbnames, tags, datas)
stmt2.execute()
# check correct
checkResultCorrects(conn, dbname, stbname, [ctb], tags, datas)
# insert with single table (performance is lower)
def insert_bind_param_with_tables(conn, stmt2, dbname, stbname):
tbnames = ["t1", "t2", "t3"]
tags = [
["grade2", 1],
["grade2", 2],
["grade2", 3]
]
# prepare data
datas = [
# table 1
[
# student
[1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)],
["Mary", "Tom", "Jack", "Jane", "alex" ],
[0, 1, 1, 0, 1 ],
[98, 80, 60, 100, 99 ]
],
# table 2
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004],
["Mary2", "Tom2", "Jack2", "Jane2", "alex2" ],
[0, 1, 1, 0, 1 ],
[298, 280, 260, 2100, 299 ]
],
# table 3
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004],
["Mary3", "Tom3", "Jack3", "Jane3", "alex3" ],
[0, 1, 1, 0, 1 ],
[398, 380, 360, 3100, 399 ]
]
]
table0 = BindTable(tbnames[0], tags[0])
table1 = BindTable(tbnames[1], tags[1])
table2 = BindTable(tbnames[2], tags[2])
for data in datas[0]:
table0.add_col_data(data)
for data in datas[1]:
table1.add_col_data(data)
for data in datas[2]:
table2.add_col_data(data)
# bind with single table
stmt2.bind_param_with_tables([table0, table1, table2])
stmt2.execute()
# check correct
checkResultCorrects(conn, dbname, stbname, tbnames, tags, datas)
def do_check_invalid(stmt2, tbnames, tags, datas):
table0 = BindTable(tbnames[0], tags[0])
table1 = BindTable(tbnames[1], tags[1])
table2 = BindTable(tbnames[2], tags[2])
for data in datas[0]:
table0.add_col_data(data)
for data in datas[1]:
table1.add_col_data(data)
for data in datas[2]:
table2.add_col_data(data)
# bind with single table
try:
stmt2.bind_param_with_tables([table0, table1, table2])
stmt2.execute()
except Exception as err:
#traceback.print_stack()
print(f"failed to do_check_invalid. err={err}")
return
print(f"input invalid data passed , unexpect. \ntbnames={tbnames}\ntags={tags} \ndatas={datas} \n")
assert False
def check_input_invalid_param(conn, stmt2, dbname, stbname):
tbnames = ["t1", "t2", "t3"]
tags = [
["grade2", 1],
["grade2", 2],
["grade2", 3]
]
# prepare data
datas = [
# table 1
[
# student
[1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)],
["Mary", "Tom", "Jack", "Jane", "alex" ],
[0, 1, 1, 0, 1 ],
[98, 80, 60, 100, 99 ]
],
# table 2
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004],
["Mary2", "Tom2", "Jack2", "Jane2", "alex2" ],
[0, 1, 1, 0, 1 ],
[298, 280, 260, 2100, 299 ]
],
# table 3
[
# student
[1601481600000,1601481600001,1601481600002,1601481600003,1601481600004],
["Mary3", "Tom3", "Jack3", "Jane3", "alex3" ],
[0, 1, 1, 0, 1 ],
[398, 380, 360, 3100, 399 ]
]
]
# some tags is none
tags1 = [ ["grade2", 1], None, ["grade2", 3] ]
do_check_invalid(stmt2, tbnames, tags1, datas)
# timestamp is over range
origin = datas[0][0][0]
datas[0][0][0] = 100000000000000000000000
do_check_invalid(stmt2, tbnames, tags, datas)
datas[0][0][0] = origin # restore
# insert with single table (performance is lower)
def insert_with_normal_tables(conn, stmt2, dbname, ntb):
tbnames = [ntb]
tags = [None]
# prepare data
wkts = [None, "POINT(121.213 31.234)", "POINT(122.22 32.222)", None, "POINT(124.22 34.222)"]
wkbs = [WKB(wkt) for wkt in wkts]
datas = [
# table 1
[
# student
[1601481600000,1601481600004,"2024-09-19 10:00:00", "2024-09-19 10:00:01.123", datetime(2024,9,20,10,11,12,456)],
[b"Mary", b"tom", b"Jack", b"Jane", None ],
[0, 3.14, True, 0, 1 ],
[98, 99.87, 60, 100, 99 ],
wkbs
]
]
table0 = BindTable(tbnames[0], tags[0])
for data in datas[0]:
table0.add_col_data(data)
# bind with single table
stmt2.bind_param_with_tables([table0])
stmt2.execute()
# check correct
checkResultCorrects(conn, dbname, None, tbnames, tags, datas)
def test_stmt2_prepare_empty_sql(conn):
if not IS_V3:
print(" test_stmt2_prepare_empty_sql not support TDengine 2.X version.")
return
try:
# prepare
stmt2 = conn.statement2()
stmt2.prepare(sql='')
# should not run here
conn.close()
print("prepare empty sql ............................. failed\n")
assert False
except StatementError as err:
print("prepare empty sql ............................. ok\n")
conn.close()
def test_bind_invalid_tbnames_type():
if not IS_V3:
print(" test_bind_invalid_tbnames_type not support TDengine 2.X version.")
return
dbname = "stmt2"
stbname = "stmt2_stable"
subtbname = "stmt2_subtable"
try:
conn = taos.connect()
conn.execute(f"drop database if exists {dbname}")
conn.execute(f"create database {dbname}")
conn.select_db(dbname)
conn.execute(f"create stable {stbname} (ts timestamp, a int) tags (b int);")
conn.execute(f"create table {subtbname} using {stbname} tags(0);")
stmt2 = conn.statement2(f"insert into ? using {dbname}.{stbname} tags(?) values(?,?)")
tags = [[1]]
datas = [[[1626861392589], [1]]]
stmt2.bind_param(subtbname, tags, datas)
# should not run here
conn.close()
print("bind invalid tbnames type ..................... failed\n")
assert False
except StatementError as err:
print("bind invalid tbnames type ..................... ok\n")
conn.close()
#
# insert
#
def test_stmt2_insert(conn):
if not IS_V3:
print(" test_stmt2_query not support TDengine 2.X version.")
return
dbname = "stmt2"
stbname = "meters"
ntb1 = "ntb1"
ntb2 = "ntb2"
try:
prepare(conn, dbname, stbname, ntb1, ntb2)
ctb = 'ctb' # child table
stmt2 = conn.statement2(f"insert into {dbname}.{ctb} using {dbname}.{stbname} tags (?,?) values(?,?,?,?)")
insert_bind_param_with_table(conn, stmt2, dbname, stbname, ctb)
print("insert child table ........................... ok\n")
stmt2.close()
# # prepare
# stmt2 = conn.statement2(f"insert into ? using {dbname}.{stbname} tags(?,?) values(?,?,?,?)")
# print("insert prepare sql ............................ ok\n")
#
# # insert with table
# insert_bind_param_with_tables(conn, stmt2, dbname, stbname)
# print("insert bind with tables ....................... ok\n")
# check_input_invalid_param(conn, stmt2, dbname, stbname)
# print("check input invalid params .................... ok\n")
#
# # insert with split args
# insert_bind_param(conn, stmt2, dbname, stbname)
# print("insert bind ................................... ok\n")
# print("insert execute ................................ ok\n")
# stmt2.close()
# ntb1
stmt2 = conn.statement2(f"insert into {dbname}.{ntb1} values(?,?,?,?,?)")
insert_with_normal_tables(conn, stmt2, dbname, ntb1)
print("insert normal tables .......................... ok\n")
stmt2.close()
# ntb2
stmt2 = conn.statement2(f"insert into {dbname}.{ntb2} values(?,?,?,?,?)")
insert_bind_param_normal_tables(conn, stmt2, dbname, ntb2)
print("insert normal tables (bind param) ............. ok\n")
stmt2.close()
conn.close()
print("test_stmt2_insert ............................. [passed]\n")
except Exception as err:
#conn.execute("drop database if exists %s" % dbname)
print("test_stmt2_insert ............................. failed\n")
conn.close()
raise err
#
# ------------------------ query -------------------
#
def query_bind_param(conn, stmt2):
# set param
#tbnames = ["d2"]
tbnames = None
tags = None
datas = [
# class 1
[
# where name in ('Tom2','alex2') or score > 1000;"
["Tom2"],
[1000]
]
]
# set param
types = [FieldType.C_BINARY, FieldType.C_INT]
stmt2.set_columns_type(types)
# bind
stmt2.bind_param(tbnames, tags, datas)
# compare
def compare_result(conn, sql2, res2):
lres1 = []
lres2 = []
# shor res2
for row in res2:
log.debug(f" res2 rows = {row} \n")
lres2.append(row)
res1 = conn.query(sql2)
for row in res1:
log.debug(f" res1 rows = {row} \n")
lres1.append(row)
row1 = len(lres1)
row2 = len(lres2)
col1 = len(lres1[0])
col2 = len(lres2[0])
# check number
if row1 != row2:
err = f"two results row count different. row1={row1} row2={row2}"
raise(BaseException(err))
if col1 != col2:
err = f" two results column count different. col1={col1} col2={col2}"
raise(BaseException(err))
for i in range(row1):
for j in range(col1):
if lres1[i][j] != lres2[i][j]:
raise(f" two results data different. i={i} j={j} data1={res1[i][j]} data2={res2[i][j]}\n")
# query
def test_stmt2_query(conn):
if not IS_V3:
print(" test_stmt2_query not support TDengine 2.X version.")
return
dbname = "stmt2"
stbname = "meters"
ntb1 = "ntb1"
ntb2 = "ntb2"
sql1 = f"select * from {dbname}.d2 where name in (?) or score > ? ;"
sql2 = f"select * from {dbname}.d2 where name in ('Tom2') or score > 1000;"
try:
# prepare
prepare(conn, dbname, stbname, ntb1, ntb2)
# prepare
# stmt2 = conn.statement2(f"insert into ? using {dbname}.{stbname} tags(?,?) values(?,?,?,?)")
# insert_bind_param_with_tables(conn, stmt2, dbname, stbname)
# insert_bind_param(conn, stmt2, dbname, stbname)
# stmt2.close()
# print("insert bind & execute ......................... ok\n")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.000', 'Mary2', false, 298)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.001', 'Tom2', true, 280)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.002', 'Jack2', true, 260)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.003', 'Jane2', false, 2100)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.004', 'alex2', true, 299)")
conn.execute(f"insert into d2 using {stbname} tags('grade1', 2) values('2020-10-01 00:00:00.005', NULL, false, NULL)")
# statement2
stmt2 = conn.statement2(sql1)
print("query prepare sql ............................. ok\n")
# insert with table
#insert_bind_param_with_tables(conn, stmt2)
# bind
query_bind_param(conn, stmt2)
print("query bind param .............................. ok\n")
# query execute
stmt2.execute()
# fetch result
res2 = stmt2.result()
# check result
compare_result(conn, sql2, res2)
print("query check corrent ........................... ok\n")
#conn.execute("drop database if exists %s" % dbname)
stmt2.close()
conn.close()
print("test_stmt2_query .............................. [passed]\n")
except Exception as err:
print("query ......................................... failed\n")
conn.close()
raise err
if __name__ == "__main__":
print("start stmt2 test case...\n")
taos.log.setting(True, True, True, True, True, False)
# insert
test_stmt2_insert(taos.connect())
# query
test_stmt2_query(taos.connect())
print("end stmt2 test case.\n")

View File

@ -52,7 +52,8 @@ Python Connector 历史版本(建议使用最新版本的 `taospy`
| Python Connector 版本 | 主要变化 | TDengine 版本 |
| -------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------- |
| 2.7.18 | 支持 Apache Superset BI 产品 | - |
| 2.7.19 | 支持 Apache Superset 连接 TDengine Cloud 数据源 | - |
| 2.7.18 | 支持 Apache Superset 产品连接本地 TDengine 数据源 | - |
| 2.7.16 | 新增订阅配置 (session.timeout.ms, max.poll.interval.ms) | - |
| 2.7.15 | 新增 VARBINARY 和 GEOMETRY 类型支持 | - |
| 2.7.14 | 修复已知问题 | - |
@ -131,7 +132,8 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | 使用 JSON 类型的标签 |
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | tmq 订阅 |
| [native_all_type_query.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/native_all_type_query.py) | 支持全部类型示例 |
| [native_all_type_stmt.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/native_all_type_stmt.py) | 参数绑定支持全部类型示例 |
| [native_all_type_stmt.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/native_all_type_stmt.py) | 参数绑定 stmt 全部类型示例 |
| [insert_with_stmt2.py](https://github.com/taosdata/taos-connector-python/blob/main/tests/test_stmt2.py) | 参数绑定 stmt2 写入示例 |
示例程序源码请参考:
@ -281,7 +283,6 @@ TaosResult 对象可以通过循环遍历获取查询到的数据。
- `def close(self)`
- **接口说明** 关闭 stmt2 对象
[使用示例](http://https://github.com/taosdata/TDengine/tree/main/docs/examples/python/insert_with_stmt2.py)
#### 数据订阅
@ -458,7 +459,6 @@ TaosResult 对象可以通过循环遍历获取查询到的数据。
- `def close(self)`
- **接口说明** 关闭 stmt2 对象
[使用示例](http://https://github.com/taosdata/TDengine/tree/main/docs/examples/python/insert_with_stmt2.py)
#### 数据订阅
- **创建消费者支持属性列表**

View File

@ -196,5 +196,3 @@ check_transactions || exit 1
reset_cache || exit 1
python3 tmq_websocket_example.py
# stmt2
python3 insert_with_stmt2.py