feat(stream)[TS-5469]: add tests for stream event notifications
Add functional tests for different stream processing scenarios and window types. Also include tests for network failure cases to ensure robustness.
This commit is contained in:
parent
fc2d2672b9
commit
8ad3a5802d
|
@ -0,0 +1,71 @@
|
|||
{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"thread_count": 4,
|
||||
"create_table_thread_count": 4,
|
||||
"result_file": "./insert_res.txt",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"num_of_records_per_req": 10000,
|
||||
"prepared_rand": 10000,
|
||||
"chinese": "no",
|
||||
"escape_character": "yes",
|
||||
"continue_if_fail": "no",
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
"name": "test",
|
||||
"drop": "no",
|
||||
"vgroups": 4,
|
||||
"precision": "ms"
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "st",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 5,
|
||||
"childtable_prefix": "ct",
|
||||
"auto_create_table": "yes",
|
||||
"batch_create_tbl_num": 5,
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"non_stop_mode": "no",
|
||||
"line_protocol": "line",
|
||||
"insert_rows": 10000,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset": 0,
|
||||
"interlace_rows": 50,
|
||||
"insert_interval": 10,
|
||||
"partial_col_num": 0,
|
||||
"timestamp_step": 500,
|
||||
"start_timestamp": "2025-01-13 17:30:00.000",
|
||||
"sample_format": "csv",
|
||||
"sample_file": "./sample.csv",
|
||||
"use_sample_ts": "no",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{"type": "TINYINT", "name": "c0"},
|
||||
{"type": "SMALLINT", "name": "c1"},
|
||||
{"type": "INT", "name": "c2"},
|
||||
{"type": "BIGINT", "name": "c3"},
|
||||
{"type": "DOUBLE", "name": "c4"},
|
||||
{"type": "FLOAT", "name": "c5"},
|
||||
{"type": "BOOL", "name": "c6"},
|
||||
{"type": "VARCHAR", "name": "c7", "len": 10},
|
||||
{"type": "NCHAR", "name": "c8", "len": 10},
|
||||
{"type": "UTINYINT", "name": "c9"},
|
||||
{"type": "USMALLINT", "name": "c10"},
|
||||
{"type": "UINT", "name": "c11"},
|
||||
{"type": "UBIGINT", "name": "c12"}
|
||||
],
|
||||
"tags": [
|
||||
{"type": "INT", "name": "groupid", "max": 100, "min": 1}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "/etc/taos",
|
||||
"host": "127.0.0.1",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"thread_count": 4,
|
||||
"create_table_thread_count": 4,
|
||||
"result_file": "./insert_res.txt",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"num_of_records_per_req": 10000,
|
||||
"prepared_rand": 10000,
|
||||
"chinese": "no",
|
||||
"escape_character": "yes",
|
||||
"continue_if_fail": "no",
|
||||
"databases": [
|
||||
{
|
||||
"dbinfo": {
|
||||
"name": "test",
|
||||
"drop": "no",
|
||||
"vgroups": 4,
|
||||
"precision": "ms"
|
||||
},
|
||||
"super_tables": [
|
||||
{
|
||||
"name": "st",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 5,
|
||||
"childtable_prefix": "ct",
|
||||
"auto_create_table": "yes",
|
||||
"batch_create_tbl_num": 5,
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"non_stop_mode": "no",
|
||||
"line_protocol": "line",
|
||||
"insert_rows": 10000,
|
||||
"disorder_ratio": 10,
|
||||
"childtable_limit": 0,
|
||||
"childtable_offset": 0,
|
||||
"interlace_rows": 50,
|
||||
"insert_interval": 10,
|
||||
"partial_col_num": 0,
|
||||
"timestamp_step": 500,
|
||||
"start_timestamp": "2025-01-13 17:30:00.000",
|
||||
"sample_format": "csv",
|
||||
"sample_file": "./sample.csv",
|
||||
"use_sample_ts": "no",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{"type": "TINYINT", "name": "c0"},
|
||||
{"type": "SMALLINT", "name": "c1"},
|
||||
{"type": "INT", "name": "c2"},
|
||||
{"type": "BIGINT", "name": "c3"},
|
||||
{"type": "DOUBLE", "name": "c4"},
|
||||
{"type": "FLOAT", "name": "c5"},
|
||||
{"type": "BOOL", "name": "c6"},
|
||||
{"type": "VARCHAR", "name": "c7", "len": 10},
|
||||
{"type": "NCHAR", "name": "c8", "len": 10},
|
||||
{"type": "UTINYINT", "name": "c9"},
|
||||
{"type": "USMALLINT", "name": "c10"},
|
||||
{"type": "UINT", "name": "c11"},
|
||||
{"type": "UBIGINT", "name": "c12"}
|
||||
],
|
||||
"tags": [
|
||||
{"type": "INT", "name": "groupid", "max": 100, "min": 1}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import asyncio
|
||||
import signal
|
||||
import websockets
|
||||
import argparse
|
||||
|
||||
stop_event = asyncio.Event()
|
||||
|
||||
async def handle_websocket(websocket, log_file):
|
||||
try:
|
||||
# Write the message to the specified log file
|
||||
if log_file != "":
|
||||
with open(log_file, "a", encoding="utf-8") as f:
|
||||
async for message in websocket:
|
||||
f.write(message + "\n")
|
||||
if stop_event.is_set():
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Connection closed with error: {e}")
|
||||
|
||||
async def listen(port, log_file):
|
||||
async with websockets.serve(
|
||||
lambda ws: handle_websocket(ws, log_file),
|
||||
"0.0.0.0",
|
||||
port,
|
||||
ping_timeout = None,
|
||||
max_size= 10 * 1024 * 1024 # 10MB,
|
||||
):
|
||||
print(f"WebSocket server listening on port {port}...")
|
||||
await stop_event.wait() # Run forever (until canceled)
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
stop_event.set()
|
||||
|
||||
if __name__ == '__main__':
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-d', '--log_file', type=str, default='stream_notify_server.log', help='log file')
|
||||
parser.add_argument('-p', '--port', type=int, default=12345, help='port number')
|
||||
args = parser.parse_args()
|
||||
|
||||
asyncio.run(listen(args.port, args.log_file))
|
|
@ -0,0 +1,473 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from frame import etool
|
||||
from frame.etool import *
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame.common import *
|
||||
import signal
|
||||
import subprocess
|
||||
|
||||
class StreamNotifyServer:
|
||||
def __init__(self):
|
||||
self.log_file = ""
|
||||
self.sub_process = None
|
||||
|
||||
def __del__(self):
|
||||
self.stop()
|
||||
|
||||
def run(self, port, log_file):
|
||||
tdLog.info(f"Start notify server: python3 {etool.curFile(__file__, 'stream_notify_server.py')} -p {port} -d {log_file}")
|
||||
self.sub_process = subprocess.Popen(['python3', etool.curFile(__file__, 'stream_notify_server.py'), '-p', str(port), '-d', log_file])
|
||||
self.log_file = log_file
|
||||
|
||||
def stop(self):
|
||||
if self.sub_process is not None:
|
||||
self.sub_process.send_signal(signal.SIGINT)
|
||||
try:
|
||||
self.sub_process.wait(60)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.sub_process.kill()
|
||||
|
||||
class TestStreamNotifySinglePass():
|
||||
def __init__(self, num_addr_per_stream, trigger_mode, notify_event, disorder):
|
||||
self.current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
self.num_addr_per_stream = num_addr_per_stream
|
||||
self.trigger_mode = trigger_mode
|
||||
self.notify_event = notify_event
|
||||
self.disorder = disorder
|
||||
self.streams = []
|
||||
self.id = ''.join(random.choices(string.ascii_letters + string.digits, k=8))
|
||||
|
||||
def is_port_in_use(self, port):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||||
return s.connect_ex(("127.0.0.1", port)) == 0
|
||||
|
||||
def gen_streams(self):
|
||||
self.streams = [
|
||||
{
|
||||
"stream_name": "s_time_par",
|
||||
"dest_table": "dst_time_par",
|
||||
"window_clause": "interval(5s)",
|
||||
"partitioned": True,
|
||||
},
|
||||
{
|
||||
"stream_name": "s_time",
|
||||
"dest_table": "dst_time",
|
||||
"window_clause": "interval(5s)",
|
||||
"partitioned": False,
|
||||
},
|
||||
{
|
||||
"stream_name": "s_state_par",
|
||||
"dest_table": "dst_state_par",
|
||||
"window_clause": "state_window(c6)",
|
||||
"partitioned": True,
|
||||
},
|
||||
{
|
||||
"stream_name": "s_session_par",
|
||||
"dest_table": "dst_session_par",
|
||||
"window_clause": "session(ts, 50a)",
|
||||
"partitioned": True,
|
||||
},
|
||||
{
|
||||
"stream_name": "s_session",
|
||||
"dest_table": "dst_session",
|
||||
"window_clause": "session(ts, 50a)",
|
||||
"partitioned": False,
|
||||
},
|
||||
{
|
||||
"stream_name": "s_event_par",
|
||||
"dest_table": "dst_event_par",
|
||||
"window_clause": "event_window start with c6 = true end with c6 = false",
|
||||
"partitioned": True,
|
||||
},
|
||||
{
|
||||
"stream_name": "s_count_par",
|
||||
"dest_table": "dst_count_par",
|
||||
"window_clause": "count_window(10)",
|
||||
"partitioned": True,
|
||||
},
|
||||
]
|
||||
# set port to random number between 10000 and 20000
|
||||
port = random.randint(10000, 20000)
|
||||
for stream in self.streams:
|
||||
stream["notify_address"] = ""
|
||||
stream["notify_server"] = []
|
||||
if self.trigger_mode == "FORCE_WINDOW_CLOSE":
|
||||
if stream["stream_name"] == "s_time" or stream["stream_name"] == "s_session":
|
||||
continue
|
||||
elif "MAX_DELAY" in self.trigger_mode or "AT_ONCE" in self.trigger_mode or self.disorder:
|
||||
if stream["stream_name"] == "s_session_par" or stream["stream_name"] == "s_state_par":
|
||||
continue
|
||||
for i in range(self.num_addr_per_stream):
|
||||
# Find an available port
|
||||
while self.is_port_in_use(port):
|
||||
port += 1
|
||||
# Start the stream notify server and add the address to the stream
|
||||
log_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.log"
|
||||
if os.path.exists(log_file):
|
||||
os.remove(log_file)
|
||||
server = StreamNotifyServer()
|
||||
server.run(port, log_file)
|
||||
stream["notify_address"] += f"'ws://127.0.0.1:{port}',"
|
||||
stream["notify_server"].append(server)
|
||||
port += 1
|
||||
stream["notify_address"] = stream["notify_address"][:-1]
|
||||
|
||||
def create_streams(self):
|
||||
tdLog.info("==========step1:create table")
|
||||
tdSql.execute("drop database if exists test;")
|
||||
tdSql.execute("create database test keep 3650;")
|
||||
tdSql.execute("use test;")
|
||||
tdSql.execute(
|
||||
f"""create stable if not exists test.st
|
||||
(ts timestamp, c0 tinyint, c1 smallint, c2 int, c3 bigint, c4 double, c5 float, c6 bool, c7 varchar(10), c8 nchar(10), c9 tinyint unsigned, c10 smallint unsigned, c11 int unsigned, c12 bigint unsigned)
|
||||
tags(groupid int);
|
||||
"""
|
||||
)
|
||||
for stream in self.streams:
|
||||
if len(stream["notify_server"]) == 0:
|
||||
continue
|
||||
stream_option = f"TRIGGER {self.trigger_mode}"
|
||||
if self.trigger_mode != "FORCE_WINDOW_CLOSE":
|
||||
stream_option += " IGNORE UPDATE 0"
|
||||
if not stream["stream_name"].startswith("s_count"):
|
||||
stream_option += " IGNORE EXPIRED 0"
|
||||
if stream["stream_name"].startswith("s_count"):
|
||||
stream_option += " WATERMARK 1a"
|
||||
stream_sql = f"""create stream {stream["stream_name"]} {stream_option} into {stream["dest_table"]} as
|
||||
select _wstart, _wend, min(c0), max(c1), min(c2), max(c3), min(c4), max(c5), first(c6), first(c7), last(c8), min(c9), max(c10), min(c11), max(c12)
|
||||
from test.st {stream["partitioned"] and "partition by tbname" or ""}
|
||||
{stream["window_clause"]}
|
||||
notify ({stream["notify_address"]}) on ({self.notify_event});
|
||||
"""
|
||||
tdSql.execute(stream_sql, show=True)
|
||||
# Wait for the stream tasks to be ready
|
||||
for i in range(50):
|
||||
tdLog.info(f"i={i} wait for stream tasks ready ...")
|
||||
time.sleep(1)
|
||||
rows = tdSql.query("select * from information_schema.ins_stream_tasks where status <> 'ready';")
|
||||
if rows == 0:
|
||||
break
|
||||
|
||||
def insert_data(self):
|
||||
tdLog.info("insert stream notify test data.")
|
||||
# taosBenchmark run
|
||||
json_file = self.disorder and "stream_notify_disorder.json" or "stream_notify.json"
|
||||
json = etool.curFile(__file__, json_file)
|
||||
etool.benchMark(json=json)
|
||||
|
||||
def wait_all_streams_done(self):
|
||||
while True:
|
||||
tdLog.info("wait for all streams done ...")
|
||||
time.sleep(10)
|
||||
rows = tdSql.query("select stream_name, level, notify_event_stat from information_schema.ins_stream_tasks where notify_event_stat is not null;")
|
||||
num_pushed = 0
|
||||
num_sent = 0
|
||||
for i in range(rows):
|
||||
tdLog.printNoPrefix(f"{tdSql.getData(i, 0)}, {tdSql.getData(i, 1)}, {tdSql.getData(i, 2)}")
|
||||
notify_event_stat = tdSql.getData(i, 2)
|
||||
match = re.search(r"Push (\d+)x, (\d+) elems", notify_event_stat)
|
||||
if match:
|
||||
num_pushed += int(match.group(2))
|
||||
match = re.search(r"Send (\d+)x, (\d+) elems", notify_event_stat)
|
||||
if match:
|
||||
num_sent += int(match.group(2))
|
||||
if num_pushed == num_sent:
|
||||
break
|
||||
tdLog.info("wait for all notify servers stop ...")
|
||||
for stream in self.streams:
|
||||
for server in stream["notify_server"]:
|
||||
server.stop()
|
||||
|
||||
def parse(self, log_file, out_file, stream_name):
|
||||
message_ids = set()
|
||||
events_map = {}
|
||||
has_open = "window_open" in self.notify_event
|
||||
has_close = "window_close" in self.notify_event
|
||||
with open(log_file, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
|
||||
# Check if the data has the required fields: messageId, timestamp, streams
|
||||
if "messageId" not in data:
|
||||
print(f"Error: Missing 'messageId' in data {data}")
|
||||
return False
|
||||
if "timestamp" not in data:
|
||||
print(f"Error: Missing 'timestamp' in data {data}")
|
||||
return False
|
||||
if "streams" not in data:
|
||||
print(f"Error: Missing 'streams' in data {data}")
|
||||
return False
|
||||
|
||||
# Check if the message id is duplicated
|
||||
if message_ids.__contains__(data["messageId"]):
|
||||
print(f"Error: Duplicate message id {data['messageId']}")
|
||||
return False
|
||||
message_ids.add(data["messageId"])
|
||||
|
||||
# Check if the streams is correct
|
||||
for stream in data["streams"]:
|
||||
# Check if the stream has the required fields: streamName, events
|
||||
if "streamName" not in stream:
|
||||
print(f"Error: Missing 'streamName' in stream {stream}")
|
||||
return False
|
||||
if "events" not in stream:
|
||||
print(f"Error: Missing 'events' in stream {stream}")
|
||||
return False
|
||||
|
||||
# Check if the stream name is correct
|
||||
if stream["streamName"] != stream_name:
|
||||
print(f"Error: Incorrect stream name {stream['streamName']}")
|
||||
return False
|
||||
|
||||
# Check if the events are correct
|
||||
for event in stream["events"]:
|
||||
# Check if the event has the required fields: tableName, eventType, eventTime, windowId, windowType
|
||||
if "tableName" not in event:
|
||||
print(f"Error: Missing 'tableName' in event {event}")
|
||||
return False
|
||||
if "eventType" not in event:
|
||||
print(f"Error: Missing 'eventType' in event {event}")
|
||||
return False
|
||||
if "eventTime" not in event:
|
||||
print(f"Error: Missing 'eventTime' in event {event}")
|
||||
return False
|
||||
if "windowId" not in event:
|
||||
print(f"Error: Missing 'windowId' in event {event}")
|
||||
return False
|
||||
if "windowType" not in event:
|
||||
print(f"Error: Missing 'windowType' in event {event}")
|
||||
return False
|
||||
if event["eventType"] not in [
|
||||
"WINDOW_OPEN",
|
||||
"WINDOW_CLOSE",
|
||||
"WINDOW_INVALIDATION",
|
||||
]:
|
||||
print(f"Error: Invalid event type {event['eventType']}")
|
||||
return False
|
||||
if event["windowType"] not in [
|
||||
"Time",
|
||||
"State",
|
||||
"Session",
|
||||
"Event",
|
||||
"Count",
|
||||
]:
|
||||
print(f"Error: Invalid window type {event['windowType']}")
|
||||
return False
|
||||
|
||||
if event["eventType"] == "WINDOW_INVALIDATION":
|
||||
if not has_close:
|
||||
print(f"Error: WINDOW_INVALIDATION event is not allowed")
|
||||
return False
|
||||
# WINDOW_INVALIDATION must have fields: windowStart, windowEnd
|
||||
if "windowStart" not in event:
|
||||
print(f"Error: Missing 'windowStart' in event {event}")
|
||||
return False
|
||||
if "windowEnd" not in event:
|
||||
print(f"Error: Missing 'windowEnd' in event {event}")
|
||||
return False
|
||||
events_map.pop(
|
||||
(event["tableName"], event["windowId"]), None
|
||||
)
|
||||
continue
|
||||
|
||||
# Get the event from the event map; if it doesn't exist, create a new one
|
||||
e = events_map.get((event["tableName"], event["windowId"]))
|
||||
if e is None:
|
||||
events_map[(event["tableName"], event["windowId"])] = {
|
||||
"opened": False,
|
||||
"closed": False,
|
||||
"wstart": 0,
|
||||
"wend": 0,
|
||||
}
|
||||
e = events_map.get((event["tableName"], event["windowId"]))
|
||||
|
||||
if event["eventType"] == "WINDOW_OPEN":
|
||||
if not has_open:
|
||||
print(f"Error: WINDOW_OPEN event is not allowed")
|
||||
return False
|
||||
# WINDOW_OPEN for all windows must have field: windowStart
|
||||
if "windowStart" not in event:
|
||||
print(f"Error: Missing 'windowStart' in event {event}")
|
||||
return False
|
||||
if event["windowType"] == "State":
|
||||
# WINDOW_OPEN for State window must also have fields: prevState, curState
|
||||
if "prevState" not in event:
|
||||
print(
|
||||
f"Error: Missing 'prevState' in event {event}"
|
||||
)
|
||||
return False
|
||||
if "curState" not in event:
|
||||
print(f"Error: Missing 'curState' in event {event}")
|
||||
return False
|
||||
elif event["windowType"] == "Event":
|
||||
# WINDOW_OPEN for Event window must also have fields: triggerCondition
|
||||
if "triggerCondition" not in event:
|
||||
print(
|
||||
f"Error: Missing 'triggerCondition' in event {event}"
|
||||
)
|
||||
return False
|
||||
e["opened"] = True
|
||||
e["wstart"] = event["windowStart"]
|
||||
elif event["eventType"] == "WINDOW_CLOSE":
|
||||
if not has_close:
|
||||
print(f"Error: WINDOW_CLOSE event is not allowed")
|
||||
return False
|
||||
# WINDOW_CLOSE for all windows must have fields: windowStart, windowEnd, result
|
||||
if "windowStart" not in event:
|
||||
print(f"Error: Missing 'windowStart' in event {event}")
|
||||
return False
|
||||
if "windowEnd" not in event:
|
||||
print(f"Error: Missing 'windowEnd' in event {event}")
|
||||
return False
|
||||
if "result" not in event:
|
||||
print(f"Error: Missing 'result' in event {event}")
|
||||
return False
|
||||
if event["windowType"] == "State":
|
||||
# WINDOW_CLOSE for State window must also have fields: curState, nextState
|
||||
if "curState" not in event:
|
||||
print(f"Error: Missing 'curState' in event {event}")
|
||||
return False
|
||||
if "nextState" not in event:
|
||||
print(
|
||||
f"Error: Missing 'nextState' in event {event}"
|
||||
)
|
||||
return False
|
||||
elif event["windowType"] == "Event":
|
||||
# WINDOW_CLOSE for Event window must also have fields: triggerCondition
|
||||
if "triggerCondition" not in event:
|
||||
print(
|
||||
f"Error: Missing 'triggerCondition' in event {event}"
|
||||
)
|
||||
return False
|
||||
e["closed"] = True
|
||||
e["wstart"] = event["windowStart"]
|
||||
e["wend"] = event["windowEnd"]
|
||||
|
||||
# Collect all the windows that closed
|
||||
windows_map = {}
|
||||
for k, v in events_map.items():
|
||||
if not v["closed"]:
|
||||
continue
|
||||
e = windows_map.get(k[0])
|
||||
if e is None:
|
||||
windows_map[k[0]] = []
|
||||
e = windows_map.get(k[0])
|
||||
e.append((v["wstart"], v["wend"]))
|
||||
|
||||
# Sort the windows by start time
|
||||
for k, v in windows_map.items():
|
||||
v.sort(key=lambda x: x[0])
|
||||
|
||||
# Write all collected window info to the specified output file in sorted order as csv format
|
||||
with open(out_file, "w", encoding="utf-8") as f:
|
||||
f.write("wstart,wend,tbname\n")
|
||||
for k, v in sorted(windows_map.items()):
|
||||
for w in v:
|
||||
f.write(f"{w[0]},{w[1]},\"{k}\"\n")
|
||||
return True
|
||||
|
||||
def check_notify_result(self):
|
||||
all_right = True
|
||||
for stream in self.streams:
|
||||
if len(stream["notify_server"]) == 0 or not "window_close" in self.notify_event:
|
||||
continue
|
||||
query_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}.csv"
|
||||
query_sql = f"select cast(_wstart as bigint) as wstart, cast(_wend as bigint) as wend, tbname from test.{stream['dest_table']} order by tbname, wstart >> {query_file};"
|
||||
tdLog.info("query_sql: " + query_sql)
|
||||
os.system(f"taos -c {tdCom.getClientCfgPath()} -s '{query_sql}'")
|
||||
for i in range(self.num_addr_per_stream):
|
||||
server = stream["notify_server"][i]
|
||||
parse_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.csv"
|
||||
if os.path.exists(parse_file):
|
||||
os.remove(parse_file)
|
||||
if not self.parse(f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.log", parse_file, stream["stream_name"]):
|
||||
tdLog.exit(f"Error: {stream['stream_name']}_{i} parse notify result failed")
|
||||
# Compare the result using diff command
|
||||
diff_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.diff"
|
||||
if os.path.exists(diff_file):
|
||||
os.remove(diff_file)
|
||||
os.system(f"diff --strip-trailing-cr {query_file} {parse_file} > {diff_file}")
|
||||
if os.path.getsize(diff_file) != 0:
|
||||
tdLog.info(f"Error: {stream['stream_name']}_{i} notify result is not correct")
|
||||
all_right = False
|
||||
if not all_right:
|
||||
raise Exception("Error: notify result is not correct")
|
||||
|
||||
def drop_all_streams(self):
|
||||
for stream in self.streams:
|
||||
tdSql.execute(f"drop stream if exists {stream['stream_name']};")
|
||||
# Also remove all generaetd files
|
||||
query_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}.csv"
|
||||
if os.path.exists(query_file):
|
||||
os.remove(query_file)
|
||||
for i in range(self.num_addr_per_stream):
|
||||
log_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.log"
|
||||
parse_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.csv"
|
||||
diff_file = f"{self.current_dir}/{self.id}_{stream['stream_name']}_{i}.diff"
|
||||
if os.path.exists(log_file):
|
||||
os.remove(log_file)
|
||||
if os.path.exists(parse_file):
|
||||
os.remove(parse_file)
|
||||
if os.path.exists(diff_file):
|
||||
os.remove(diff_file)
|
||||
|
||||
def run(self):
|
||||
tdLog.info(f"Start to execute TestStreamNotifySinglePass({self.num_addr_per_stream}, {self.trigger_mode}, {self.notify_event}, {self.disorder})")
|
||||
self.gen_streams()
|
||||
self.create_streams()
|
||||
self.insert_data()
|
||||
self.wait_all_streams_done()
|
||||
self.check_notify_result()
|
||||
self.drop_all_streams()
|
||||
tdLog.info(f"TestStreamNotifySinglePass({self.num_addr_per_stream}, {self.trigger_mode}, {self.notify_event}, {self.disorder}) successfully executed")
|
||||
|
||||
class TDTestCase(TBase):
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
def run(self):
|
||||
# Disable many tests due to long execution time
|
||||
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="AT_ONCE", notify_event="'window_open', 'window_close'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="MAX_DELAY 10s", notify_event="'window_open', 'window_close'", disorder=False).run()
|
||||
TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=False).run()
|
||||
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="AT_ONCE", notify_event="'window_open', 'window_close'", disorder=True).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="MAX_DELAY 10s", notify_event="'window_open', 'window_close'", disorder=True).run()
|
||||
TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=True).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_open', 'window_close'", disorder=True).run()
|
||||
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="AT_ONCE", notify_event="'window_close'", disorder=False).run()
|
||||
TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="MAX_DELAY 10s", notify_event="'window_close'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="WINDOW_CLOSE", notify_event="'window_close'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=1, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_close'", disorder=False).run()
|
||||
|
||||
TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="AT_ONCE", notify_event="'window_open'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="MAX_DELAY 10s", notify_event="'window_open'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="WINDOW_CLOSE", notify_event="'window_open'", disorder=False).run()
|
||||
# TestStreamNotifySinglePass(num_addr_per_stream=3, trigger_mode="FORCE_WINDOW_CLOSE", notify_event="'window_open'", disorder=False).run()
|
||||
|
||||
def stop(self):
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -10,6 +10,7 @@
|
|||
|
||||
# army-test
|
||||
#,,y,army,./pytest.sh python3 ./test.py -f multi-level/mlevel_basic.py -N 3 -L 3 -D 2
|
||||
,,y,army,./pytest.sh python3 ./test.py -f stream/test_stream_notify.py
|
||||
|
||||
#tsim test
|
||||
#,,y,script,./test.sh -f tsim/query/timeline.sim
|
||||
|
|
|
@ -222,7 +222,7 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
|
||||
tdLog.info(len(tdSql.queryResult))
|
||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(313, 314))
|
||||
tdSql.checkEqual(True, len(tdSql.queryResult) in range(314, 315))
|
||||
|
||||
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
|
||||
tdSql.checkEqual(61, len(tdSql.queryResult))
|
||||
|
|
|
@ -1033,18 +1033,26 @@ int convertStringToDatatype(char *type, int length) {
|
|||
return TSDB_DATA_TYPE_BOOL;
|
||||
} else if (0 == strCompareN(type, "tinyint", length)) {
|
||||
return TSDB_DATA_TYPE_TINYINT;
|
||||
} else if (0 == strCompareN(type, "tinyint unsigned", length)) {
|
||||
return TSDB_DATA_TYPE_UTINYINT;
|
||||
} else if (0 == strCompareN(type, "utinyint", length)) {
|
||||
return TSDB_DATA_TYPE_UTINYINT;
|
||||
} else if (0 == strCompareN(type, "smallint", length)) {
|
||||
return TSDB_DATA_TYPE_SMALLINT;
|
||||
} else if (0 == strCompareN(type, "smallint unsigned", length)) {
|
||||
return TSDB_DATA_TYPE_USMALLINT;
|
||||
} else if (0 == strCompareN(type, "usmallint", length)) {
|
||||
return TSDB_DATA_TYPE_USMALLINT;
|
||||
} else if (0 == strCompareN(type, "int", length)) {
|
||||
return TSDB_DATA_TYPE_INT;
|
||||
} else if (0 == strCompareN(type, "int unsigned", length)) {
|
||||
return TSDB_DATA_TYPE_UINT;
|
||||
} else if (0 == strCompareN(type, "uint", length)) {
|
||||
return TSDB_DATA_TYPE_UINT;
|
||||
} else if (0 == strCompareN(type, "bigint", length)) {
|
||||
return TSDB_DATA_TYPE_BIGINT;
|
||||
} else if (0 == strCompareN(type, "bigint unsigned", length)) {
|
||||
return TSDB_DATA_TYPE_UBIGINT;
|
||||
} else if (0 == strCompareN(type, "ubigint", length)) {
|
||||
return TSDB_DATA_TYPE_UBIGINT;
|
||||
} else if (0 == strCompareN(type, "float", length)) {
|
||||
|
|
Loading…
Reference in New Issue