Merge pull request #14282 from taosdata/cpwu/3.0

test: fix constant: add taos_keywords; add time_range_wise case
This commit is contained in:
Jason-Jia 2022-06-27 17:01:10 +08:00 committed by GitHub
commit 5c9752c984
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 542 additions and 107 deletions

View File

@ -1,5 +1,76 @@
# -*- coding: utf-8 -*-
# basic type
TAOS_DATA_TYPE = [
"INT", "BIGINT", "SMALLINT", "TINYINT", "INT UNSIGNED", "BIGINT UNSIGNED", "SMALLINT UNSIGNED", "TINYINT UNSIGNED",
"FLOAT", "DOUBLE",
"BOOL",
"BINARY", "NCHAR", "VARCHAR",
"TIMESTAMP",
# "MEDIUMBLOB", "BLOB", # add in 3.x
# "DECIMAL", "NUMERIC", # add in 3.x
"JSON", # only for tag
]
TAOS_NUM_TYPE = [
"INT", "BIGINT", "SMALLINT", "TINYINT", "INT UNSIGNED", "BIGINT UNSIGNED", "SMALLINT UNSIGNED", "TINYINT UNSIGNED", "FLOAT", "DOUBLE",
# "DECIMAL", "NUMERIC", # add in 3.x
]
TAOS_CHAR_TYPE = [
"BINARY", "NCHAR", "VARCHAR",
]
TAOS_BOOL_TYPE = ["BOOL",]
TAOS_TS_TYPE = ["TIMESTAMP",]
TAOS_BIN_TYPE = [
"MEDIUMBLOB", "BLOB", # add in 3.x
]
TAOS_TIME_INIT = ["b", "u", "a", "s", "m", "h", "d", "w", "n", "y"]
TAOS_PRECISION = ["ms", "us", "ns"]
PRECISION_DEFAULT = "ms"
PRECISION = PRECISION_DEFAULT
TAOS_KEYWORDS = [
"ABORT", "CREATE", "IGNORE", "NULL", "STAR",
"ACCOUNT", "CTIME", "IMMEDIATE", "OF", "STATE",
"ACCOUNTS", "DATABASE", "IMPORT", "OFFSET", "STATEMENT",
"ADD", "DATABASES", "IN", "OR", "STATE_WINDOW",
"AFTER", "DAYS", "INITIALLY", "ORDER", "STORAGE",
"ALL", "DBS", "INSERT", "PARTITIONS", "STREAM",
"ALTER", "DEFERRED", "INSTEAD", "PASS", "STREAMS",
"AND", "DELIMITERS", "INT", "PLUS", "STRING",
"AS", "DESC", "INTEGER", "PPS", "SYNCDB",
"ASC", "DESCRIBE", "INTERVAL", "PRECISION", "TABLE",
"ATTACH", "DETACH", "INTO", "PREV", "TABLES",
"BEFORE", "DISTINCT", "IS", "PRIVILEGE", "TAG",
"BEGIN", "DIVIDE", "ISNULL", "QTIME", "TAGS",
"BETWEEN", "DNODE", "JOIN", "QUERIES", "TBNAME",
"BIGINT", "DNODES", "KEEP", "QUERY", "TIMES",
"BINARY", "DOT", "KEY", "QUORUM", "TIMESTAMP",
"BITAND", "DOUBLE", "KILL", "RAISE", "TINYINT",
"BITNOT", "DROP", "LE", "REM", "TOPIC",
"BITOR", "EACH", "LIKE", "REPLACE", "TOPICS",
"BLOCKS", "END", "LIMIT", "REPLICA", "TRIGGER",
"BOOL", "EQ", "LINEAR", "RESET", "TSERIES",
"BY", "EXISTS", "LOCAL", "RESTRICT", "UMINUS",
"CACHE", "EXPLAIN", "LP", "ROW", "UNION",
"CACHELAST", "FAIL", "LSHIFT", "RP", "UNSIGNED",
"CASCADE", "FILE", "LT", "RSHIFT", "UPDATE",
"CHANGE", "FILL", "MATCH", "SCORES", "UPLUS",
"CLUSTER", "FLOAT", "MAXROWS", "SELECT", "USE",
"COLON", "FOR", "MINROWS", "SEMI", "USER",
"COLUMN", "FROM", "MINUS", "SESSION", "USERS",
"COMMA", "FSYNC", "MNODES", "SET", "USING",
"COMP", "GE", "MODIFY", "SHOW", "VALUES",
"COMPACT", "GLOB", "MODULES", "SLASH", "VARIABLE",
"CONCAT", "GRANTS", "NCHAR", "SLIDING", "VARIABLES",
"CONFLICT", "GROUP", "NE", "SLIMIT", "VGROUPS",
"CONNECTION", "GT", "NONE", "SMALLINT", "VIEW",
"CONNECTIONS", "HAVING", "NOT", "SOFFSET", "VNODES",
"CONNS", "ID", "NOTNULL", "STABLE", "WAL",
"COPY", "IF", "NOW", "STABLES", "WHERE",
]
# basic data type boundary
TINYINT_MAX = 127
TINYINT_MIN = -128
@ -11,7 +82,7 @@ SMALLINT_MAX = 32767
SMALLINT_MIN = -32768
SMALLINT_UN_MAX = 65535
MALLINT_UN_MIN = 0
SMALLINT_UN_MIN = 0
INT_MAX = 2147483647
INT_MIN = -2147483648
@ -33,8 +104,8 @@ DOUBLE_MIN = -1.7E+308
# schema boundary
BINARY_LENGTH_MAX = 16374
NCAHR_LENGTH_MAX_ = 4093
DBNAME_LENGTH_MAX_ = 64
NCAHR_LENGTH_MAX = 4093
DBNAME_LENGTH_MAX = 64
STBNAME_LENGTH_MAX = 192
STBNAME_LENGTH_MIN = 1
@ -66,4 +137,32 @@ MNODE_SHM_SIZE_DEFAULT = 6292480
VNODE_SHM_SIZE_MAX = 2147483647
VNODE_SHM_SIZE_MIN = 6292480
VNODE_SHM_SIZE_DEFAULT = 31458304
VNODE_SHM_SIZE_DEFAULT = 31458304
# time_init
TIME_MS = 1
TIME_US = TIME_MS/1000
TIME_NS = TIME_US/1000
TIME_S = 1000 * TIME_MS
TIME_M = 60 * TIME_S
TIME_H = 60 * TIME_M
TIME_D = 24 * TIME_H
TIME_W = 7 * TIME_D
TIME_N = 30 * TIME_D
TIME_Y = 365 * TIME_D
# session parameters
INTERVAL_MIN = 1 * TIME_MS if PRECISION == PRECISION_DEFAULT else 1 * TIME_US
# streams and related agg-function
SMA_INDEX_FUNCTIONS = ["MIN", "MAX"]
ROLLUP_FUNCTIONS = ["AVG", "SUM", "MIN", "MAX", "LAST", "FIRST"]
SMA_WATMARK_MAXDELAY_INIT = ['a', "s", "m"]
WATERMARK_MAX = 900000
WATERMARK_MIN = 0
MAX_DELAY_MAX = 900000
MAX_DELAY_MIN = 1

View File

@ -21,6 +21,7 @@ import psutil
import shutil
import pandas as pd
from util.log import *
from util.constant import *
def _parse_datetime(timestr):
try:
@ -117,8 +118,7 @@ class TDSql:
col_name_list = []
col_type_list = []
self.cursor.execute(sql)
self.queryCols = self.cursor.description
for query_col in self.queryCols:
for query_col in self.cursor.description:
col_name_list.append(query_col[0])
col_type_list.append(query_col[1])
except Exception as e:
@ -301,6 +301,41 @@ class TDSql:
args = (caller.filename, caller.lineno, self.sql, elm, expect_elm)
tdLog.exit("%s(%d) failed: sql:%s, elm:%s == expect_elm:%s" % args)
def get_times(self, time_str, precision="ms"):
caller = inspect.getframeinfo(inspect.stack()[1][0])
if time_str[-1] not in TAOS_TIME_INIT:
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: {time_str} not a standard taos time init")
if precision not in TAOS_PRECISION:
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: {precision} not a standard taos time precision")
if time_str[-1] == TAOS_TIME_INIT[0]:
times = int(time_str[:-1]) * TIME_NS
if time_str[-1] == TAOS_TIME_INIT[1]:
times = int(time_str[:-1]) * TIME_US
if time_str[-1] == TAOS_TIME_INIT[2]:
times = int(time_str[:-1]) * TIME_MS
if time_str[-1] == TAOS_TIME_INIT[3]:
times = int(time_str[:-1]) * TIME_S
if time_str[-1] == TAOS_TIME_INIT[4]:
times = int(time_str[:-1]) * TIME_M
if time_str[-1] == TAOS_TIME_INIT[5]:
times = int(time_str[:-1]) * TIME_H
if time_str[-1] == TAOS_TIME_INIT[6]:
times = int(time_str[:-1]) * TIME_D
if time_str[-1] == TAOS_TIME_INIT[7]:
times = int(time_str[:-1]) * TIME_W
if time_str[-1] == TAOS_TIME_INIT[8]:
times = int(time_str[:-1]) * TIME_N
if time_str[-1] == TAOS_TIME_INIT[9]:
times = int(time_str[:-1]) * TIME_Y
if precision == "ms":
return int(times)
elif precision == "us":
return int(times*1000)
elif precision == "ns":
return int(times*1000*1000)
def taosdStatus(self, state):
tdLog.sleep(5)
pstate = 0

View File

@ -21,9 +21,9 @@ SINT_UN_COL = "c_sint_un"
BINT_UN_COL = "c_bint_un"
INT_UN_COL = "c_int_un"
BINARY_COL = "c8"
NCHAR_COL = "c9"
TS_COL = "c10"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [ INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [ BINARY_COL, NCHAR_COL, ]
@ -51,12 +51,28 @@ class DataSet:
binary_data : List[str] = None
nchar_data : List[str] = None
def __post_init__(self):
self.ts_data = []
self.int_data = []
self.bint_data = []
self.sint_data = []
self.tint_data = []
self.int_un_data = []
self.bint_un_data = []
self.sint_un_data = []
self.tint_un_data = []
self.float_data = []
self.double_data = []
self.bool_data = []
self.binary_data = []
self.nchar_data = []
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
tdSql.init(conn.cursor(), False)
@property
def create_databases_sql_err(self):
@ -87,28 +103,28 @@ class TDTestCase:
@property
def create_stable_sql_err(self):
return [
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s maxdelay 1m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(ceil) watermark 1s max_delay 1m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(count) watermark 1min",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) maxdelay -1s",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay -1s",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark -1m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) maxdelay 1m ",
# f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 1m ",
# f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) max_delay 1m ",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} binary(16)) tags (tag1 int) rollup(avg) watermark 1s",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) rollup(avg) maxdelay 1m",
# f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) rollup(avg) watermark 1s maxdelay 1s",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) rollup(avg) max_delay 1m",
# f"create table ntb_1 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) rollup(avg) watermark 1s max_delay 1s",
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) tags (tag1 int) " ,
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) " ,
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) " ,
# f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int, {BINARY_COL} nchar(16)) " ,
# watermark, maxdelay: [0, 900000], [ms, s, m, ?]
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) maxdelay 1u",
# watermark, max_delay: [0, 900000], [ms, s, m, ?]
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1u",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 1b",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 900001ms",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) maxdelay 16m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) maxdelay 901s",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) maxdelay 1h",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) maxdelay 0.2h",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 16m",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 901s",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 1h",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) max_delay 0.2h",
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 0.002d",
]
@ -117,11 +133,11 @@ class TDTestCase:
def create_stable_sql_current(self):
return [
f"create stable stb1 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(avg)",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 5s maxdelay 1m",
f"create stable stb3 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(max) watermark 5s maxdelay 1m",
f"create stable stb4 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum) watermark 5s maxdelay 1m",
# f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s maxdelay 1m",
# f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s maxdelay 1m",
f"create stable stb2 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(min) watermark 5s max_delay 1m",
f"create stable stb3 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(max) watermark 5s max_delay 1m",
f"create stable stb4 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(sum) watermark 5s max_delay 1m",
# f"create stable stb5 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(last) watermark 5s max_delay 1m",
# f"create stable stb6 ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) rollup(first) watermark 5s max_delay 1m",
]
def test_create_stb(self):
@ -135,7 +151,7 @@ class TDTestCase:
tdSql.checkRows(len(self.create_stable_sql_current))
# tdSql.execute("use db") # because db is a noraml database, not a rollup database, should not be able to create a rollup database
# tdSql.error(f"create stable nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) file_factor 5.0")
# tdSql.error(f"create stable nor_db_rollup_stb ({PRIMARY_COL} timestamp, {INT_COL} int) tags (tag1 int) watermark 5s max_delay 1m")
def test_create_databases(self):
@ -177,21 +193,6 @@ class TDTestCase:
def __data_set(self, rows):
data_set = DataSet()
# neg_data_set = DataSet()
data_set.ts_data = []
data_set.int_data = []
data_set.bint_data = []
data_set.sint_data = []
data_set.tint_data = []
data_set.int_un_data = []
data_set.bint_un_data = []
data_set.sint_un_data = []
data_set.tint_un_data = []
data_set.float_data = []
data_set.double_data = []
data_set.bool_data = []
data_set.binary_data = []
data_set.nchar_data = []
for i in range(rows):
data_set.ts_data.append(NOW + 1 * (rows - i))
@ -226,6 +227,7 @@ class TDTestCase:
return data_set
def __insert_data(self):
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
data = self.__data_set(rows=self.rows)
# now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
@ -264,10 +266,10 @@ class TDTestCase:
def run(self):
self.rows = 10
tdSql.prepare()
tdLog.printNoPrefix("==========step0:all check")
# self.all_test()
self.all_test()
tdLog.printNoPrefix("==========step1:create table in normal database")
tdSql.prepare()

View File

@ -17,25 +17,34 @@ TINT_COL = "c_tint"
FLOAT_COL = "c_float"
DOUBLE_COL = "c_double"
BOOL_COL = "c_bool"
TINT_UN_COL = "c_tint_un"
SINT_UN_COL = "c_sint_un"
BINT_UN_COL = "c_bint_un"
INT_UN_COL = "c_int_un"
TINT_UN_COL = "c_utint"
SINT_UN_COL = "c_usint"
BINT_UN_COL = "c_ubint"
INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [BOOL_COL, ]
TS_TYPE_COL = [TS_COL, ]
INT_TAG = "t_int"
ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL]
TAG_COL = [INT_TAG]
# insert data args
TIME_STEP = 10000
NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
# init db/table
DBNAME = "db"
STBNAME = "stb1"
CTBNAME = "ct1"
NTBNAME = "nt1"
@dataclass
class DataSet:
@ -73,19 +82,25 @@ class DataSet:
@dataclass
class SMAschema:
creation : str = "CREATE"
index_name : str = "sma_index_1"
index_flag : str = "SMA INDEX"
operator : str = "ON"
tbname : str = None
watermark : str = None
maxdelay : str = None
func : Tuple[str] = None
interval : Tuple[str] = None
sliding : str = None
other : Any = None
drop : str = "DROP"
drop_flag : str = "INDEX"
creation : str = "CREATE"
index_name : str = "sma_index_1"
index_flag : str = "SMA INDEX"
operator : str = "ON"
tbname : str = None
watermark : str = "5s"
max_delay : str = "6m"
func : Tuple[str] = None
interval : Tuple[str] = ("6m", "10s")
sliding : str = "6m"
other : Any = None
drop : str = "DROP"
drop_flag : str = "INDEX"
querySmaOptimize : int = 1
show : str = "SHOW"
show_msg : str = "INDEXES"
show_oper : str = "FROM"
dbname : str = None
rollup_db : bool = False
def __post_init__(self):
if isinstance(self.other, dict):
@ -111,8 +126,8 @@ class SMAschema:
self.watermark = v
del self.other[k]
if k.lower() == "maxdelay" and isinstance(v, str) and not self.maxdelay:
self.maxdelay = v
if k.lower() == "max_delay" and isinstance(v, str) and not self.max_delay:
self.max_delay = v
del self.other[k]
if k.lower() == "functions" and isinstance(v, tuple) and not self.func:
@ -131,12 +146,36 @@ class SMAschema:
self.drop_flag = v
del self.other[k]
if k.lower() == "show_msg" and isinstance(v, str) and not self.show_msg:
self.show_msg = v
del self.other[k]
if k.lower() == "dbname" and isinstance(v, str) and not self.dbname:
self.dbname = v
del self.other[k]
if k.lower() == "show_oper" and isinstance(v, str) and not self.show_oper:
self.show_oper = v
del self.other[k]
if k.lower() == "rollup_db" and isinstance(v, bool) and not self.rollup_db:
self.rollup_db = v
del self.other[k]
# from ...pytest.util.sql import *
# from ...pytest.util.constant import *
class TDTestCase:
updatecfgDict = {"querySmaOptimize": 1}
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
self.precision = "ms"
self.sma_count = 0
self.sma_created_index = []
"""
create sma index :
@ -155,13 +194,17 @@ class TDTestCase:
if sma.func:
sql += f" function({', '.join(sma.func)})"
if sma.interval:
sql += f" interval({', '.join(sma.interval)})"
interval, offset = self.__get_interval_offset(sma.interval)
if offset:
sql += f" interval({interval}, {offset})"
else:
sql += f" interval({interval})"
if sma.sliding:
sql += f" sliding({sma.sliding})"
if sma.watermark:
sql += f" watermark {sma.watermark}"
if sma.maxdelay:
sql += f" maxdelay {sma.maxdelay}"
if sma.max_delay:
sql += f" max_delay {sma.max_delay}"
if isinstance(sma.other, dict):
for k,v in sma.other.items():
if isinstance(v,tuple) or isinstance(v, list):
@ -171,53 +214,305 @@ class TDTestCase:
if isinstance(sma.other, tuple) or isinstance(sma.other, list):
sql += " ".join(sma.other)
if isinstance(sma.other, int) or isinstance(sma.other, float) or isinstance(sma.other, str):
sql += sma.other
sql += f" {sma.other}"
return sql
def sma_create_check(self, sma:SMAschema):
def __get_sma_func_col(self, func):
cols = []
if isinstance(func, str):
cols.append( func.split("(")[-1].split(")")[0] )
elif isinstance(func, tuple) or isinstance(func, list):
for func_col in func:
cols.append(func_col.split("(")[-1].split(")")[0])
else:
cols = []
return cols
def __check_sma_func(self, func:tuple):
if not isinstance(func, str) and not isinstance(func, tuple) and not isinstance(func, list):
return False
if isinstance(func, str) :
if "(" not in func or ")" not in func:
return False
if func.split("(")[0].upper() not in SMA_INDEX_FUNCTIONS:
return False
if func.split("(")[1].split(")")[0] not in ALL_COL and func.split("(")[1].split(")")[0] not in TAG_COL :
return False
if isinstance(func, tuple) or isinstance(func, list):
for arg in func:
if not isinstance(arg, str):
return False
if "(" not in arg or ")" not in arg:
return False
if arg.split("(")[0].upper() not in SMA_INDEX_FUNCTIONS:
return False
if arg.split("(")[1].split(")")[0] not in ALL_COL and arg.split("(")[1].split(")")[0] not in TAG_COL :
return False
return True
def __check_sma_watermark(self, arg):
if not arg:
return False
if not isinstance(arg, str):
return False
if arg[-1] not in SMA_WATMARK_MAXDELAY_INIT:
return False
if len(arg) == 1:
return False
if not arg[:-1].isdecimal():
return False
if tdSql.get_times(arg) > WATERMARK_MAX:
return False
if tdSql.get_times(arg) < WATERMARK_MIN:
return False
return True
def __check_sma_max_delay(self, arg):
if not self.__check_sma_watermark(arg):
return False
if tdSql.get_times(arg) < MAX_DELAY_MIN:
return False
return True
def __check_sma_sliding(self, arg):
if not isinstance(arg, str):
return False
if arg[-1] not in TAOS_TIME_INIT:
return False
if len(arg) == 1:
return False
if not arg[:-1].isdecimal():
return False
return True
def __get_interval_offset(self, args):
if isinstance(args, str):
interval, offset = args, None
elif isinstance(args,tuple) or isinstance(args, list):
if len(args) == 1:
interval, offset = args[0], None
elif len(args) == 2:
interval, offset = args
else:
interval, offset = False, False
else:
interval, offset = False, False
return interval, offset
def __check_sma_interval(self, args):
if not isinstance(args, tuple) and not isinstance(args,str):
return False
interval, offset = self.__get_interval_offset(args)
if not interval:
return False
if not self.__check_sma_sliding(interval):
return False
if tdSql.get_times(interval) < INTERVAL_MIN:
return False
if offset:
if not self.__check_sma_sliding(offset):
return False
if tdSql.get_times(interval) <= tdSql.get_times(offset) :
return False
return True
def __sma_create_check(self, sma:SMAschema):
if self.updatecfgDict["querySmaOptimize"] == 0:
return False
# # TODO: if database is a rollup-db, can not create sma index
# tdSql.query("select database()")
# if sma.rollup_db :
# return False
tdSql.query("show stables")
if not sma.tbname:
return False
stb_in_list = False
for row in tdSql.queryResult:
if sma.tbname == row[0]:
stb_in_list = True
break
if not stb_in_list:
tdSql.error(self.__create_sma_index(sma))
if not sma.creation:
tdSql.error(self.__create_sma_index(sma))
if not sma.index_flag:
tdSql.error(self.__create_sma_index(sma))
if not sma.index_name:
tdSql.error(self.__create_sma_index(sma))
if not sma.operator:
tdSql.error(self.__create_sma_index(sma))
if not sma.tbname:
tdSql.error(self.__create_sma_index(sma))
if not sma.func:
tdSql.error(self.__create_sma_index(sma))
if not sma.interval:
tdSql.error(self.__create_sma_index(sma))
if not sma.sliding:
tdSql.error(self.__create_sma_index(sma))
return False
if not sma.creation or not isinstance(sma.creation, str) or sma.creation.upper() != "CREATE":
return False
if not sma.index_flag or not isinstance(sma.index_flag, str) or sma.index_flag.upper() != "SMA INDEX" :
return False
if not sma.index_name or not isinstance(sma.index_name, str) or sma.index_name.upper() in TAOS_KEYWORDS:
return False
if not sma.operator or not isinstance(sma.operator, str) or sma.operator.upper() != "ON":
return False
if not sma.func or not self.__check_sma_func(sma.func):
return False
tdSql.query(f"desc {sma.tbname}")
_col_list = []
for col_row in tdSql.queryResult:
_col_list.append(col_row[0])
_sma_func_cols = self.__get_sma_func_col(sma.func)
for _sma_func_col in _sma_func_cols:
if _sma_func_col not in _col_list:
return False
if sma.sliding and not self.__check_sma_sliding(sma.sliding):
return False
interval, _ = self.__get_interval_offset(sma.interval)
if not sma.interval or not self.__check_sma_interval(sma.interval) :
return False
if sma.sliding and tdSql.get_times(interval) < tdSql.get_times(sma.sliding):
return False
if sma.watermark and not self.__check_sma_watermark(sma.watermark):
return False
if sma.max_delay and not self.__check_sma_max_delay(sma.max_delay):
return False
if sma.other:
return False
return True
def sma_create_check(self, sma:SMAschema):
if self.__sma_create_check(sma):
tdSql.query(self.__create_sma_index(sma))
self.sma_count += 1
self.sma_created_index.append(sma.index_name)
tdSql.query("show streams")
tdSql.checkRows(self.sma_count)
else:
tdSql.error(self.__create_sma_index(sma))
def __drop_sma_index(self, sma:SMAschema):
sql = f"{sma.drop} {sma.drop_flag} {sma.index_name}"
return sql
def __sma_drop_check(self, sma:SMAschema):
if not sma.drop:
return False
if not sma.drop_flag:
return False
if not sma.index_name:
return False
return True
def sma_drop_check(self, sma:SMAschema):
if self.__sma_drop_check(sma):
tdSql.query(self.__drop_sma_index(sma))
print(self.__drop_sma_index(sma))
self.sma_count -= 1
self.sma_created_index = list(filter(lambda x: x != sma.index_name, self.sma_created_index))
tdSql.query("show streams")
tdSql.checkRows(self.sma_count)
else:
tdSql.error(self.__drop_sma_index(sma))
def __show_sma_index(self, sma:SMAschema):
sql = f"{sma.show} {sma.show_msg} {sma.show_oper} {sma.tbname}"
return sql
def __sma_show_check(self, sma:SMAschema):
if not sma.show:
return False
if not sma.show_msg:
return False
if not sma.show_oper:
return False
if not sma.tbname:
return False
return True
def sma_show_check(self, sma:SMAschema):
if self.__sma_show_check(sma):
tdSql.query(self.__show_sma_index(sma))
tdSql.checkRows(self.sma_count)
else:
tdSql.error(self.__show_sma_index(sma))
@property
def __create_sma_sql(self):
err_sqls = []
cur_sqls = []
# err_set
# # case 1: required fields check
err_sqls.append( SMAschema(creation="", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_name="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_flag="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(operator="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(tbname="", func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(func=("",),tbname=STBNAME ) )
err_sqls.append( SMAschema(interval=(""),tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
# # case 2: err fields
err_sqls.append( SMAschema(creation="show",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(creation="alter",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(creation="select",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_flag="SMA INDEXES", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_flag="SMA INDEX ,", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_name="tbname", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
# current_set
cur_sqls.append( SMAschema(max_delay="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
cur_sqls.append( SMAschema(watermark="",index_name="sma_index_2",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
cur_sqls.append( SMAschema(sliding="",index_name='sma_index_3',tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
return err_sqls, cur_sqls
def test_create_sma(self):
err_sqls , cur_sqls = self.__create_sma_sql
for err_sql in err_sqls:
self.sma_create_check(err_sql)
for cur_sql in cur_sqls:
self.sma_create_check(cur_sql)
@property
def __drop_sma_sql(self):
err_sqls = []
cur_sqls = []
# err_set
## case 1: required fields check
err_sqls.append( SMAschema(drop="") )
err_sqls.append( SMAschema(drop_flag="") )
err_sqls.append( SMAschema(index_name="") )
for index in self.sma_created_index:
cur_sqls.append(SMAschema(index_name=index))
return err_sqls, cur_sqls
def test_drop_sma(self):
err_sqls , cur_sqls = self.__drop_sma_sql
for err_sql in err_sqls:
self.sma_drop_check(err_sql)
# for cur_sql in cur_sqls:
# self.sma_drop_check(cur_sql)
def all_test(self):
self.test_create_sma()
self.test_drop_sma()
pass
def __create_tb(self):
tdLog.printNoPrefix("==========step: create table")
create_stb_sql = f'''create table stb1(
create_stb_sql = f'''create table {STBNAME}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
) tags (tag1 int)
) tags ({INT_TAG} int)
'''
create_ntb_sql = f'''create table t1(
create_ntb_sql = f'''create table {NTBNAME}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
@ -253,6 +548,7 @@ class TDTestCase:
return data_set
def __insert_data(self):
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
data = self.__data_set(rows=self.rows)
# now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
@ -278,7 +574,7 @@ class TDTestCase:
tdSql.execute(
f"insert into ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )")
tdSql.execute(
f"insert into t1 values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )")
f"insert into {NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )")
@ -295,28 +591,31 @@ class TDTestCase:
f"insert into ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )")
tdSql.execute(
f"insert into t1 values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )")
f"insert into {NTBNAME} values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )")
tdSql.execute(
f"insert into t1 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )")
f"insert into {NTBNAME} values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )")
tdSql.execute(
f"insert into t1 values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )")
f"insert into {NTBNAME} values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )")
def run(self):
sma1 = SMAschema(func=("min(c1)","max(c2)"))
sql1 = self.__create_sma_index(sma1)
print("================")
print(sql1)
# a = DataSet()
# return
self.rows = 10
tdLog.printNoPrefix("==========step0:all check")
# self.all_test()
tdLog.printNoPrefix("==========step1:create table in normal database")
tdSql.prepare()
self.__create_tb()
self.__insert_data()
# self.__insert_data()
self.all_test()
# drop databases, create same name db、stb and sma index
# tdSql.prepare()
# self.__create_tb()
# self.__insert_data()
# self.all_test()
return
tdLog.printNoPrefix("==========step2:create table in rollup database")

View File

@ -28,7 +28,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
tdSql.init(conn.cursor(), False)
def __query_condition(self,tbname):
query_condition = []

View File

@ -28,7 +28,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), True)
tdSql.init(conn.cursor(), False)
def __query_condition(self,tbname):
query_condition = []

View File

@ -31,7 +31,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(),False)
def __substr_condition(self): # sourcery skip: extract-method
substr_condition = []