test: add csv interlace case

This commit is contained in:
Yaming Pei 2025-03-06 09:01:02 +08:00
parent 1b2afe31ed
commit 59ef7cd3be
1 changed files with 153 additions and 40 deletions

View File

@ -13,6 +13,7 @@
import os
import json
import csv
import datetime
import frame
import frame.etool
@ -26,80 +27,192 @@ from frame import *
class TDTestCase(TBase):
def caseDescription(self):
"""
[TD-11510] taosBenchmark test cases
"""
# check correct
def checkCorrect(self, csvFile, allRows, interlaceRows):
# open as csv
count = 0
batch = 0
name = ""
with open(csvFile) as file:
[TS-5089] taosBenchmark support exporting csv
"""
def clear_directory(self, target_dir: str = 'csv'):
try:
if not os.path.exists(target_dir):
return
for entry in os.listdir(target_dir):
entry_path = os.path.join(target_dir, entry)
if os.path.isfile(entry_path) or os.path.islink(entry_path):
os.unlink(entry_path)
else:
shutil.rmtree(entry_path)
tdLog.debug("clear succ, dir: %s " % (target_dir))
except OSError as e:
tdLog.exit("clear fail, dir: %s " % (target_dir))
def convert_timestamp(self, ts, ts_format):
dt_object = datetime.datetime.fromtimestamp(ts / 1000)
formatted_time = dt_object.strftime(ts_format)
return formatted_time
def calc_time_slice_partitions(self, total_start_ts, total_end_ts, ts_step, childs, ts_format, ts_interval):
interval_days = int(ts_interval[:-1])
n_days_millis = interval_days * 24 * 60 * 60 * 1000
dt_start = datetime.datetime.fromtimestamp(total_start_ts / 1000.0)
formatted_str = dt_start.strftime(ts_format)
s0_dt = datetime.datetime.strptime(formatted_str, ts_format)
s0 = int(s0_dt.timestamp() * 1000)
partitions = []
current_s = s0
while current_s <= total_end_ts:
current_end = current_s + n_days_millis
start_actual = max(current_s, total_start_ts)
end_actual = min(current_end, total_end_ts)
if start_actual >= end_actual:
count = 0
else:
delta = end_actual - start_actual
delta
delta_start = start_actual - total_start_ts
delta_end = end_actual - total_start_ts
if delta % ts_step:
count = delta // ts_step + 1
else:
count = delta // ts_step
count *= childs
partitions.append({
"start_ts": current_s,
"end_ts": current_end,
"start_time": self.convert_timestamp(current_s, ts_format),
"end_time": self.convert_timestamp(current_end, ts_format),
"count": count
})
current_s += n_days_millis
# partitions = [p for p in partitions if p['count'] > 0]
return partitions
def check_stb_csv_correct(self, csv_file_name, all_rows, interlace_rows):
# open as csv
tbname_idx = 14
count = 0
batch = 0
name = ""
header = True
with open(csv_file_name) as file:
rows = csv.reader(file)
for row in rows:
# interlaceRows
if header:
header = False
continue
# interlace_rows
if name == "":
name = row[0]
name = row[tbname_idx]
batch = 1
else:
if name == row[0]:
if name == row[tbname_idx]:
batch += 1
else:
# switch to another child table
if batch != interlaceRows:
tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}")
if batch != interlace_rows:
tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}")
batch = 1
name = row[0]
name = row[tbname_idx]
# count ++
count += 1
# batch
if batch != interlaceRows:
tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}")
if batch != interlace_rows:
tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}")
# check all rows
if count != allRows:
tdLog.exit(f"allRows invalid. real={count} expect={allRows} csvFile={csvFile}")
if count != all_rows:
tdLog.exit(f"all_rows invalid. actual={count} expected={all_rows} csv_file_name={csv_file_name}")
tdLog.info(f"Check generate csv file successfully. csvFile={csvFile} count={count} interlaceRows={batch}")
tdLog.info(f"Check generate csv file successfully. csv_file_name={csv_file_name} count={count} interlace_rows={batch}")
# check correct
def check_stb_correct(self, data, db, stb):
filepath = data["output_path"]
stbName = stb["name"]
childs = stb["childtable_to"] - stb["childtable_from"]
insert_rows = stb["insert_rows"]
interlace_rows = stb["interlace_rows"]
csv_file_prefix = stb["csv_file_prefix"]
csv_ts_format = stb.get("csv_ts_format", None)
csv_ts_interval = stb.get("csv_ts_interval", None)
ts_step = stb["timestamp_step"]
total_start_ts = stb["start_timestamp"]
total_end_ts = total_start_ts + ts_step * insert_rows
all_rows = childs * insert_rows
if interlace_rows > 0:
# interlace
if not csv_ts_format:
# normal
csv_file_name = f"{filepath}{csv_file_prefix}.csv"
self.check_stb_csv_correct(csv_file_name, all_rows, interlace_rows)
else:
# time slice
partitions = self.calc_time_slice_partitions(total_start_ts, total_end_ts, ts_step, childs, csv_ts_format, csv_ts_interval)
for part in partitions:
csv_file_name = f"{filepath}{csv_file_prefix}_{part['start_time']}_{part['end_time']}.csv"
self.check_stb_csv_correct(csv_file_name, part['count'], interlace_rows)
else:
# batch
interlace_rows = insert_rows
if not csv_ts_format:
# normal
pass
else:
# time slice
pass
# check result
def checResult(self, jsonFile):
def check_result(self, jsonFile):
# csv
with open(jsonFile) as file:
data = json.load(file)
# read json
# read json
database = data["databases"][0]
out = data["csvPath"]
dbName = database["dbinfo"]["name"]
stables = database["super_tables"]
for stable in stables:
stbName = stable["name"]
childs = stable["childtable_count"]
insertRows = stable["insert_rows"]
interlaceRows = stable["interlace_rows"]
csvFile = f"{out}{dbName}-{stbName}.csv"
rows = childs * insertRows
if interlaceRows == 0:
interlaceRows = insertRows
# check csv context correct
self.checkCorrect(csvFile, rows, interlaceRows)
def checkExportCsv(self, benchmark, jsonFile, options=""):
for stable in stables:
# check csv context correct
self.check_stb_correct(data, database, stable)
def check_export_csv(self, benchmark, jsonFile, options=""):
# clear
self.clear_directory()
# exec
cmd = f"{benchmark} {options} -f {jsonFile}"
os.system(cmd)
# check result
self.checResult(jsonFile)
self.check_result(jsonFile)
def run(self):
# path
benchmark = etool.benchMarkFile()
# do check
json = "tools/benchmark/basic/json/exportCsv.json"
self.checkExportCsv(benchmark, json)
# do check interlace normal
json = "tools/benchmark/basic/json/csv-interlace-normal.json"
self.check_export_csv(benchmark, json)
def stop(self):
tdSql.close()