From 8ad3a5802d209648bd2d0c9f1bfc57641cfc520b Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Wed, 12 Feb 2025 23:33:31 +0800 Subject: [PATCH] feat(stream)[TS-5469]: add tests for stream event notifications Add functional tests for different stream processing scenarios and window types. Also include tests for network failure cases to ensure robustness. --- tests/army/stream/stream_notify.json | 71 +++ tests/army/stream/stream_notify_disorder.json | 72 +++ tests/army/stream/stream_notify_server.py | 55 ++ tests/army/stream/test_stream_notify.py | 473 ++++++++++++++++++ tests/parallel_test/longtimeruning_cases.task | 1 + .../0-others/information_schema.py | 2 +- tools/taos-tools/src/benchUtil.c | 8 + 7 files changed, 681 insertions(+), 1 deletion(-) create mode 100644 tests/army/stream/stream_notify.json create mode 100644 tests/army/stream/stream_notify_disorder.json create mode 100644 tests/army/stream/stream_notify_server.py create mode 100644 tests/army/stream/test_stream_notify.py diff --git a/tests/army/stream/stream_notify.json b/tests/army/stream/stream_notify.json new file mode 100644 index 0000000000..9dcbe4efb2 --- /dev/null +++ b/tests/army/stream/stream_notify.json @@ -0,0 +1,71 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "create_table_thread_count": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "num_of_records_per_req": 10000, + "prepared_rand": 10000, + "chinese": "no", + "escape_character": "yes", + "continue_if_fail": "no", + "databases": [ + { + "dbinfo": { + "name": "test", + "drop": "no", + "vgroups": 4, + "precision": "ms" + }, + "super_tables": [ + { + "name": "st", + "child_table_exists": "no", + "childtable_count": 5, + "childtable_prefix": "ct", + "auto_create_table": "yes", + "batch_create_tbl_num": 5, + "data_source": "rand", + "insert_mode": "taosc", + "non_stop_mode": "no", + "line_protocol": "line", + "insert_rows": 10000, + "childtable_limit": 0, + "childtable_offset": 0, + "interlace_rows": 50, + "insert_interval": 10, + "partial_col_num": 0, + "timestamp_step": 500, + "start_timestamp": "2025-01-13 17:30:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "use_sample_ts": "no", + "tags_file": "", + "columns": [ + {"type": "TINYINT", "name": "c0"}, + {"type": "SMALLINT", "name": "c1"}, + {"type": "INT", "name": "c2"}, + {"type": "BIGINT", "name": "c3"}, + {"type": "DOUBLE", "name": "c4"}, + {"type": "FLOAT", "name": "c5"}, + {"type": "BOOL", "name": "c6"}, + {"type": "VARCHAR", "name": "c7", "len": 10}, + {"type": "NCHAR", "name": "c8", "len": 10}, + {"type": "UTINYINT", "name": "c9"}, + {"type": "USMALLINT", "name": "c10"}, + {"type": "UINT", "name": "c11"}, + {"type": "UBIGINT", "name": "c12"} + ], + "tags": [ + {"type": "INT", "name": "groupid", "max": 100, "min": 1} + ] + } + ] + } + ] +} diff --git a/tests/army/stream/stream_notify_disorder.json b/tests/army/stream/stream_notify_disorder.json new file mode 100644 index 0000000000..2f9f9bea69 --- /dev/null +++ b/tests/army/stream/stream_notify_disorder.json @@ -0,0 +1,72 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 4, + "create_table_thread_count": 4, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "num_of_records_per_req": 10000, + "prepared_rand": 10000, + "chinese": "no", + "escape_character": "yes", + "continue_if_fail": "no", + "databases": [ + { + "dbinfo": { + "name": "test", + "drop": "no", + "vgroups": 4, + "precision": "ms" + }, + "super_tables": [ + { + "name": "st", + "child_table_exists": "no", + "childtable_count": 5, + "childtable_prefix": "ct", + "auto_create_table": "yes", + "batch_create_tbl_num": 5, + "data_source": "rand", + "insert_mode": "taosc", + "non_stop_mode": "no", + "line_protocol": "line", + "insert_rows": 10000, + "disorder_ratio": 10, + "childtable_limit": 0, + "childtable_offset": 0, + "interlace_rows": 50, + "insert_interval": 10, + "partial_col_num": 0, + "timestamp_step": 500, + "start_timestamp": "2025-01-13 17:30:00.000", + "sample_format": "csv", + "sample_file": "./sample.csv", + "use_sample_ts": "no", + "tags_file": "", + "columns": [ + {"type": "TINYINT", "name": "c0"}, + {"type": "SMALLINT", "name": "c1"}, + {"type": "INT", "name": "c2"}, + {"type": "BIGINT", "name": "c3"}, + {"type": "DOUBLE", "name": "c4"}, + {"type": "FLOAT", "name": "c5"}, + {"type": "BOOL", "name": "c6"}, + {"type": "VARCHAR", "name": "c7", "len": 10}, + {"type": "NCHAR", "name": "c8", "len": 10}, + {"type": "UTINYINT", "name": "c9"}, + {"type": "USMALLINT", "name": "c10"}, + {"type": "UINT", "name": "c11"}, + {"type": "UBIGINT", "name": "c12"} + ], + "tags": [ + {"type": "INT", "name": "groupid", "max": 100, "min": 1} + ] + } + ] + } + ] +} diff --git a/tests/army/stream/stream_notify_server.py b/tests/army/stream/stream_notify_server.py new file mode 100644 index 0000000000..a105d55971 --- /dev/null +++ b/tests/army/stream/stream_notify_server.py @@ -0,0 +1,55 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import asyncio +import signal +import websockets +import argparse + +stop_event = asyncio.Event() + +async def handle_websocket(websocket, log_file): + try: + # Write the message to the specified log file + if log_file != "": + with open(log_file, "a", encoding="utf-8") as f: + async for message in websocket: + f.write(message + "\n") + if stop_event.is_set(): + break + except Exception as e: + print(f"Connection closed with error: {e}") + +async def listen(port, log_file): + async with websockets.serve( + lambda ws: handle_websocket(ws, log_file), + "0.0.0.0", + port, + ping_timeout = None, + max_size= 10 * 1024 * 1024 # 10MB, + ): + print(f"WebSocket server listening on port {port}...") + await stop_event.wait() # Run forever (until canceled) + +def signal_handler(sig, frame): + stop_event.set() + +if __name__ == '__main__': + signal.signal(signal.SIGINT, signal_handler) + + parser = argparse.ArgumentParser() + parser.add_argument('-d', '--log_file', type=str, default='stream_notify_server.log', help='log file') + parser.add_argument('-p', '--port', type=int, default=12345, help='port number') + args = parser.parse_args() + + asyncio.run(listen(args.port, args.log_file)) diff --git a/tests/army/stream/test_stream_notify.py b/tests/army/stream/test_stream_notify.py new file mode 100644 index 0000000000..0e6bbd5c01 --- /dev/null +++ b/tests/army/stream/test_stream_notify.py @@ -0,0 +1,473 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from frame import etool +from frame.etool import * +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame.common import * +import signal +import subprocess + +class StreamNotifyServer: + def __init__(self): + self.log_file = "" + self.sub_process = None + + def __del__(self): + self.stop() + + def run(self, port, log_file): + tdLog.info(f"Start notify server: python3 {etool.curFile(__file__, 'stream_notify_server.py')} -p {port} -d {log_file}") + self.sub_process = subprocess.Popen(['python3', etool.curFile(__file__, 'stream_notify_server.py'), '-p', str(port), '-d', log_file]) + self.log_file = log_file + + def stop(self): + if self.sub_process is not None: + self.sub_process.send_signal(signal.SIGINT) + try: + self.sub_process.wait(60) + except subprocess.TimeoutExpired: + self.sub_process.kill() + +class TestStreamNotifySinglePass(): + def __init__(self, num_addr_per_stream, trigger_mode, notify_event, disorder): + self.current_dir = os.path.dirname(os.path.abspath(__file__)) + self.num_addr_per_stream = num_addr_per_stream + self.trigger_mode = trigger_mode + self.notify_event = notify_event + self.disorder = disorder + self.streams = [] + self.id = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + + def is_port_in_use(self, port): + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(("127.0.0.1", port)) == 0 + + def gen_streams(self): + self.streams = [ + { + "stream_name": "s_time_par", + "dest_table": "dst_time_par", + "window_clause": "interval(5s)", + "partitioned": True, + }, + { + "stream_name": "s_time", + "dest_table": "dst_time", + "window_clause": "interval(5s)", + "partitioned": False, + }, + { + "stream_name": "s_state_par", + "dest_table": "dst_state_par", + "window_clause": "state_window(c6)", + "partitioned": True, + }, + { + "stream_name": "s_session_par", + "dest_table": "dst_session_par", + "window_clause": "session(ts, 50a)", + "partitioned": True, + }, + { + "stream_name": "s_session", + "dest_table": "dst_session", + "window_clause": "session(ts, 50a)", + "partitioned": False, + }, + { + "stream_name": "s_event_par", + "dest_table": "dst_event_par", + "window_clause": "event_window start with c6 = true end with c6 = false", + "partitioned": True, + }, + { + "stream_name": "s_count_par", + "dest_table": "dst_count_par", + "window_clause": "count_window(10)", + "partitioned": True, + }, + ] + # set port to random number between 10000 and 20000 + port = random.randint(10000, 20000) + for stream in self.streams: + stream["notify_address"] = "" + stream["notify_server"] = [] + if self.trigger_mode == "FORCE_WINDOW_CLOSE": + if stream["stream_name"] == "s_time" or stream["stream_name"] == "s_session": + continue + elif "MAX_DELAY" in self.trigger_mode or "AT_ONCE" in self.trigger_mode or self.disorder: + if stream["stream_name"] == "s_session_par" or stream["stream_name"] == "s_state_par": + continue + for i in range(self.num_addr_per_stream): + # Find an available port + while self.is_port_in_use(port): + port += 1 + # Start the stream notify server and add the address to the stream + log_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.log" + if os.path.exists(log_file): + os.remove(log_file) + server = StreamNotifyServer() + server.run(port, log_file) + stream["notify_address"] += f"'ws://127.0.0.1:{port}'," + stream["notify_server"].append(server) + port += 1 + stream["notify_address"] = stream["notify_address"][:-1] + + def create_streams(self): + tdLog.info("==========step1:create table") + tdSql.execute("drop database if exists test;") + tdSql.execute("create database test keep 3650;") + tdSql.execute("use test;") + tdSql.execute( + f"""create stable if not exists test.st + (ts timestamp, c0 tinyint, c1 smallint, c2 int, c3 bigint, c4 double, c5 float, c6 bool, c7 varchar(10), c8 nchar(10), c9 tinyint unsigned, c10 smallint unsigned, c11 int unsigned, c12 bigint unsigned) + tags(groupid int); + """ + ) + for stream in self.streams: + if len(stream["notify_server"]) == 0: + continue + stream_option = f"TRIGGER {self.trigger_mode}" + if self.trigger_mode != "FORCE_WINDOW_CLOSE": + stream_option += " IGNORE UPDATE 0" + if not stream["stream_name"].startswith("s_count"): + stream_option += " IGNORE EXPIRED 0" + if stream["stream_name"].startswith("s_count"): + stream_option += " WATERMARK 1a" + stream_sql = f"""create stream {stream["stream_name"]} {stream_option} into {stream["dest_table"]} as + select _wstart, _wend, min(c0), max(c1), min(c2), max(c3), min(c4), max(c5), first(c6), first(c7), last(c8), min(c9), max(c10), min(c11), max(c12) + from test.st {stream["partitioned"] and "partition by tbname" or ""} + {stream["window_clause"]} + notify ({stream["notify_address"]}) on ({self.notify_event}); + """ + tdSql.execute(stream_sql, show=True) + # Wait for the stream tasks to be ready + for i in range(50): + tdLog.info(f"i={i} wait for stream tasks ready ...") + time.sleep(1) + rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';") + if rows == 0: + break + + def insert_data(self): + tdLog.info("insert stream notify test data.") + # taosBenchmark run + json_file = self.disorder and "stream_notify_disorder.json" or "stream_notify.json" + json = etool.curFile(__file__, json_file) + etool.benchMark(json=json) + + def wait_all_streams_done(self): + while True: + tdLog.info("wait for all streams done ...") + time.sleep(10) + rows = tdSql.query("select stream_name, level, notify_event_stat from information_schema.ins_stream_tasks where notify_event_stat is not null;") + num_pushed = 0 + num_sent = 0 + for i in range(rows): + tdLog.printNoPrefix(f"{tdSql.getData(i, 0)}, {tdSql.getData(i, 1)}, {tdSql.getData(i, 2)}") + notify_event_stat = tdSql.getData(i, 2) + match = re.search(r"Push (\d+)x, (\d+) elems", notify_event_stat) + if match: + num_pushed += int(match.group(2)) + match = re.search(r"Send (\d+)x, (\d+) elems", notify_event_stat) + if match: + num_sent += int(match.group(2)) + if num_pushed == num_sent: + break + tdLog.info("wait for all notify servers stop ...") + for stream in self.streams: + for server in stream["notify_server"]: + server.stop() + + def parse(self, log_file, out_file, stream_name): + message_ids = set() + events_map = {} + has_open = "window_open" in self.notify_event + has_close = "window_close" in self.notify_event + with open(log_file, "r", encoding="utf-8") as f: + for line in f: + data = json.loads(line) + + # Check if the data has the required fields: messageId, timestamp, streams + if "messageId" not in data: + print(f"Error: Missing 'messageId' in data {data}") + return False + if "timestamp" not in data: + print(f"Error: Missing 'timestamp' in data {data}") + return False + if "streams" not in data: + print(f"Error: Missing 'streams' in data {data}") + return False + + # Check if the message id is duplicated + if message_ids.__contains__(data["messageId"]): + print(f"Error: Duplicate message id {data['messageId']}") + return False + message_ids.add(data["messageId"]) + + # Check if the streams is correct + for stream in data["streams"]: + # Check if the stream has the required fields: streamName, events + if "streamName" not in stream: + print(f"Error: Missing 'streamName' in stream {stream}") + return False + if "events" not in stream: + print(f"Error: Missing 'events' in stream {stream}") + return False + + # Check if the stream name is correct + if stream["streamName"] != stream_name: + print(f"Error: Incorrect stream name {stream['streamName']}") + return False + + # Check if the events are correct + for event in stream["events"]: + # Check if the event has the required fields: tableName, eventType, eventTime, windowId, windowType + if "tableName" not in event: + print(f"Error: Missing 'tableName' in event {event}") + return False + if "eventType" not in event: + print(f"Error: Missing 'eventType' in event {event}") + return False + if "eventTime" not in event: + print(f"Error: Missing 'eventTime' in event {event}") + return False + if "windowId" not in event: + print(f"Error: Missing 'windowId' in event {event}") + return False + if "windowType" not in event: + print(f"Error: Missing 'windowType' in event {event}") + return False + if event["eventType"] not in [ + "WINDOW_OPEN", + "WINDOW_CLOSE", + "WINDOW_INVALIDATION", + ]: + print(f"Error: Invalid event type {event['eventType']}") + return False + if event["windowType"] not in [ + "Time", + "State", + "Session", + "Event", + "Count", + ]: + print(f"Error: Invalid window type {event['windowType']}") + return False + + if event["eventType"] == "WINDOW_INVALIDATION": + if not has_close: + print(f"Error: WINDOW_INVALIDATION event is not allowed") + return False + # WINDOW_INVALIDATION must have fields: windowStart, windowEnd + if "windowStart" not in event: + print(f"Error: Missing 'windowStart' in event {event}") + return False + if "windowEnd" not in event: + print(f"Error: Missing 'windowEnd' in event {event}") + return False + events_map.pop( + (event["tableName"], event["windowId"]), None + ) + continue + + # Get the event from the event map; if it doesn't exist, create a new one + e = events_map.get((event["tableName"], event["windowId"])) + if e is None: + events_map[(event["tableName"], event["windowId"])] = { + "opened": False, + "closed": False, + "wstart": 0, + "wend": 0, + } + e = events_map.get((event["tableName"], event["windowId"])) + + if event["eventType"] == "WINDOW_OPEN": + if not has_open: + print(f"Error: WINDOW_OPEN event is not allowed") + return False + # WINDOW_OPEN for all windows must have field: windowStart + if "windowStart" not in event: + print(f"Error: Missing 'windowStart' in event {event}") + return False + if event["windowType"] == "State": + # WINDOW_OPEN for State window must also have fields: prevState, curState + if "prevState" not in event: + print( + f"Error: Missing 'prevState' in event {event}" + ) + return False + if "curState" not in event: + print(f"Error: Missing 'curState' in event {event}") + return False + elif event["windowType"] == "Event": + # WINDOW_OPEN for Event window must also have fields: triggerCondition + if "triggerCondition" not in event: + print( + f"Error: Missing 'triggerCondition' in event {event}" + ) + return False + e["opened"] = True + e["wstart"] = event["windowStart"] + elif event["eventType"] == "WINDOW_CLOSE": + if not has_close: + print(f"Error: WINDOW_CLOSE event is not allowed") + return False + # WINDOW_CLOSE for all windows must have fields: windowStart, windowEnd, result + if "windowStart" not in event: + print(f"Error: Missing 'windowStart' in event {event}") + return False + if "windowEnd" not in event: + print(f"Error: Missing 'windowEnd' in event {event}") + return False + if "result" not in event: + print(f"Error: Missing 'result' in event {event}") + return False + if event["windowType"] == "State": + # WINDOW_CLOSE for State window must also have fields: curState, nextState + if "curState" not in event: + print(f"Error: Missing 'curState' in event {event}") + return False + if "nextState" not in event: + print( + f"Error: Missing 'nextState' in event {event}" + ) + return False + elif event["windowType"] == "Event": + # WINDOW_CLOSE for Event window must also have fields: triggerCondition + if "triggerCondition" not in event: + print( + f"Error: Missing 'triggerCondition' in event {event}" + ) + return False + e["closed"] = True + e["wstart"] = event["windowStart"] + e["wend"] = event["windowEnd"] + + # Collect all the windows that closed + windows_map = {} + for k, v in events_map.items(): + if not v["closed"]: + continue + e = windows_map.get(k[0]) + if e is None: + windows_map[k[0]] = [] + e = windows_map.get(k[0]) + e.append((v["wstart"], v["wend"])) + + # Sort the windows by start time + for k, v in windows_map.items(): + v.sort(key=lambda x: x[0]) + + # Write all collected window info to the specified output file in sorted order as csv format + with open(out_file, "w", encoding="utf-8") as f: + f.write("wstart,wend,tbname\n") + for k, v in sorted(windows_map.items()): + for w in v: + f.write(f"{w[0]},{w[1]},\"{k}\"\n") + return True + + def check_notify_result(self): + all_right = True + for stream in self.streams: + if len(stream["notify_server"]) == 0 or not "window_close" in self.notify_event: + continue + query_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}.csv" + query_sql = f"select cast(_wstart as bigint) as wstart, cast(_wend as bigint) as wend, tbname from test.{stream['dest_table']} order by tbname, wstart >> {query_file};" + tdLog.info("query_sql: " + query_sql) + os.system(f"taos -c {tdCom.getClientCfgPath()} -s '{query_sql}'") + for i in range(self.num_addr_per_stream): + server = stream["notify_server"][i] + parse_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.csv" + if os.path.exists(parse_file): + os.remove(parse_file) + if not self.parse(f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.log", parse_file, stream["stream_name"]): + tdLog.exit(f"Error: {stream['stream_name']}_{i} parse notify result failed") + # Compare the result using diff command + diff_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.diff" + if os.path.exists(diff_file): + os.remove(diff_file) + os.system(f"diff --strip-trailing-cr {query_file} {parse_file} > {diff_file}") + if os.path.getsize(diff_file) != 0: + tdLog.info(f"Error: {stream['stream_name']}_{i} notify result is not correct") + all_right = False + if not all_right: + raise Exception("Error: notify result is not correct") + + def drop_all_streams(self): + for stream in self.streams: + tdSql.execute(f"drop stream if exists {stream['stream_name']};") + # Also remove all generaetd files + query_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}.csv" + if os.path.exists(query_file): + os.remove(query_file) + for i in range(self.num_addr_per_stream): + log_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.log" + parse_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.csv" + diff_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.diff" + if os.path.exists(log_file): + os.remove(log_file) + if os.path.exists(parse_file): + os.remove(parse_file) + if os.path.exists(diff_file): + os.remove(diff_file) + + def run(self): + tdLog.info(f"Start to execute TestStreamNotifySinglePass({self.num_addr_per_stream}, {self.trigger_mode}, {self.notify_event}, {self.disorder})") + self.gen_streams() + self.create_streams() + self.insert_data() + self.wait_all_streams_done() + self.check_notify_result() + self.drop_all_streams() + tdLog.info(f"TestStreamNotifySinglePass({self.num_addr_per_stream}, {self.trigger_mode}, {self.notify_event}, {self.disorder}) successfully executed") + +class TDTestCase(TBase): + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def run(self): + # Disable many tests due to long execution time + + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="AT_ONCE", notify_event="'window_open', 'window_close'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="MAX_DELAY 10s", notify_event="'window_open', 'window_close'", disorder=False).run() + TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=False).run() + + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="AT_ONCE", notify_event="'window_open', 'window_close'", disorder=True).run() + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="MAX_DELAY 10s", notify_event="'window_open', 'window_close'", disorder=True).run() + TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=True).run() + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=True).run() + + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="AT_ONCE", notify_event="'window_close'", disorder=False).run() + TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="MAX_DELAY 10s", notify_event="'window_close'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="WINDOW_CLOSE", notify_event="'window_close'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_close'", disorder=False).run() + + TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="AT_ONCE", notify_event="'window_open'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="MAX_DELAY 10s", notify_event="'window_open'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="WINDOW_CLOSE", notify_event="'window_open'", disorder=False).run() + # TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_open'", disorder=False).run() + + def stop(self): + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/parallel_test/longtimeruning_cases.task b/tests/parallel_test/longtimeruning_cases.task index 1034771159..a4230882a9 100644 --- a/tests/parallel_test/longtimeruning_cases.task +++ b/tests/parallel_test/longtimeruning_cases.task @@ -10,6 +10,7 @@ # army-test #,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2 +,,y,army,./pytest.sh python3 ./test.py -f stream/test_stream_notify.py #tsim test #,,y,script,./test.sh -f tsim/query/timeline.sim diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 5bdb744f6c..4874174f38 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(313, 314)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(314, 315)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(61, len(tdSql.queryResult)) diff --git a/tools/taos-tools/src/benchUtil.c b/tools/taos-tools/src/benchUtil.c index ad59c4e37e..8dcca30f8c 100644 --- a/tools/taos-tools/src/benchUtil.c +++ b/tools/taos-tools/src/benchUtil.c @@ -1033,18 +1033,26 @@ int convertStringToDatatype(char *type, int length) { return TSDB_DATA_TYPE_BOOL; } else if (0 == strCompareN(type, "tinyint", length)) { return TSDB_DATA_TYPE_TINYINT; + } else if (0 == strCompareN(type, "tinyint unsigned", length)) { + return TSDB_DATA_TYPE_UTINYINT; } else if (0 == strCompareN(type, "utinyint", length)) { return TSDB_DATA_TYPE_UTINYINT; } else if (0 == strCompareN(type, "smallint", length)) { return TSDB_DATA_TYPE_SMALLINT; + } else if (0 == strCompareN(type, "smallint unsigned", length)) { + return TSDB_DATA_TYPE_USMALLINT; } else if (0 == strCompareN(type, "usmallint", length)) { return TSDB_DATA_TYPE_USMALLINT; } else if (0 == strCompareN(type, "int", length)) { return TSDB_DATA_TYPE_INT; + } else if (0 == strCompareN(type, "int unsigned", length)) { + return TSDB_DATA_TYPE_UINT; } else if (0 == strCompareN(type, "uint", length)) { return TSDB_DATA_TYPE_UINT; } else if (0 == strCompareN(type, "bigint", length)) { return TSDB_DATA_TYPE_BIGINT; + } else if (0 == strCompareN(type, "bigint unsigned", length)) { + return TSDB_DATA_TYPE_UBIGINT; } else if (0 == strCompareN(type, "ubigint", length)) { return TSDB_DATA_TYPE_UBIGINT; } else if (0 == strCompareN(type, "float", length)) {