From 59ef7cd3be5e1b07eefa241c2698786f7fd02b54 Mon Sep 17 00:00:00 2001 From: Yaming Pei Date: Thu, 6 Mar 2025 09:01:02 +0800 Subject: [PATCH] test: add csv interlace case --- tests/army/tools/benchmark/basic/exportCsv.py | 193 ++++++++++++++---- 1 file changed, 153 insertions(+), 40 deletions(-) diff --git a/tests/army/tools/benchmark/basic/exportCsv.py b/tests/army/tools/benchmark/basic/exportCsv.py index b8b3828ea6..fececcf7f2 100644 --- a/tests/army/tools/benchmark/basic/exportCsv.py +++ b/tests/army/tools/benchmark/basic/exportCsv.py @@ -13,6 +13,7 @@ import os import json import csv +import datetime import frame import frame.etool @@ -26,80 +27,192 @@ from frame import * class TDTestCase(TBase): def caseDescription(self): """ - [TD-11510] taosBenchmark test cases - """ - # check correct - def checkCorrect(self, csvFile, allRows, interlaceRows): - # open as csv - count = 0 - batch = 0 - name = "" - with open(csvFile) as file: + [TS-5089] taosBenchmark support exporting csv + """ + + + def clear_directory(self, target_dir: str = 'csv'): + try: + if not os.path.exists(target_dir): + return + for entry in os.listdir(target_dir): + entry_path = os.path.join(target_dir, entry) + if os.path.isfile(entry_path) or os.path.islink(entry_path): + os.unlink(entry_path) + else: + shutil.rmtree(entry_path) + + tdLog.debug("clear succ, dir: %s " % (target_dir)) + except OSError as e: + tdLog.exit("clear fail, dir: %s " % (target_dir)) + + + def convert_timestamp(self, ts, ts_format): + dt_object = datetime.datetime.fromtimestamp(ts / 1000) + formatted_time = dt_object.strftime(ts_format) + return formatted_time + + + def calc_time_slice_partitions(self, total_start_ts, total_end_ts, ts_step, childs, ts_format, ts_interval): + interval_days = int(ts_interval[:-1]) + n_days_millis = interval_days * 24 * 60 * 60 * 1000 + + dt_start = datetime.datetime.fromtimestamp(total_start_ts / 1000.0) + formatted_str = dt_start.strftime(ts_format) + s0_dt = datetime.datetime.strptime(formatted_str, ts_format) + s0 = int(s0_dt.timestamp() * 1000) + + partitions = [] + current_s = s0 + + while current_s <= total_end_ts: + current_end = current_s + n_days_millis + start_actual = max(current_s, total_start_ts) + end_actual = min(current_end, total_end_ts) + + if start_actual >= end_actual: + count = 0 + else: + delta = end_actual - start_actual + delta + delta_start = start_actual - total_start_ts + delta_end = end_actual - total_start_ts + if delta % ts_step: + count = delta // ts_step + 1 + else: + count = delta // ts_step + + count *= childs + + partitions.append({ + "start_ts": current_s, + "end_ts": current_end, + "start_time": self.convert_timestamp(current_s, ts_format), + "end_time": self.convert_timestamp(current_end, ts_format), + "count": count + }) + + current_s += n_days_millis + + # partitions = [p for p in partitions if p['count'] > 0] + return partitions + + + def check_stb_csv_correct(self, csv_file_name, all_rows, interlace_rows): + # open as csv + tbname_idx = 14 + count = 0 + batch = 0 + name = "" + header = True + with open(csv_file_name) as file: rows = csv.reader(file) for row in rows: - # interlaceRows + if header: + header = False + continue + + # interlace_rows if name == "": - name = row[0] + name = row[tbname_idx] batch = 1 else: - if name == row[0]: + if name == row[tbname_idx]: batch += 1 else: # switch to another child table - if batch != interlaceRows: - tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}") + if batch != interlace_rows: + tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}") batch = 1 - name = row[0] + name = row[tbname_idx] # count ++ count += 1 # batch - if batch != interlaceRows: - tdLog.exit(f"interlaceRows invalid. tbName={name} real={batch} expect={interlaceRows} i={count} csvFile={csvFile}") + if batch != interlace_rows: + tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}") # check all rows - if count != allRows: - tdLog.exit(f"allRows invalid. real={count} expect={allRows} csvFile={csvFile}") + if count != all_rows: + tdLog.exit(f"all_rows invalid. actual={count} expected={all_rows} csv_file_name={csv_file_name}") - tdLog.info(f"Check generate csv file successfully. csvFile={csvFile} count={count} interlaceRows={batch}") + tdLog.info(f"Check generate csv file successfully. csv_file_name={csv_file_name} count={count} interlace_rows={batch}") + + + # check correct + def check_stb_correct(self, data, db, stb): + filepath = data["output_path"] + stbName = stb["name"] + childs = stb["childtable_to"] - stb["childtable_from"] + insert_rows = stb["insert_rows"] + interlace_rows = stb["interlace_rows"] + csv_file_prefix = stb["csv_file_prefix"] + csv_ts_format = stb.get("csv_ts_format", None) + csv_ts_interval = stb.get("csv_ts_interval", None) + + ts_step = stb["timestamp_step"] + total_start_ts = stb["start_timestamp"] + total_end_ts = total_start_ts + ts_step * insert_rows + + + all_rows = childs * insert_rows + if interlace_rows > 0: + # interlace + + if not csv_ts_format: + # normal + csv_file_name = f"{filepath}{csv_file_prefix}.csv" + self.check_stb_csv_correct(csv_file_name, all_rows, interlace_rows) + else: + # time slice + partitions = self.calc_time_slice_partitions(total_start_ts, total_end_ts, ts_step, childs, csv_ts_format, csv_ts_interval) + for part in partitions: + csv_file_name = f"{filepath}{csv_file_prefix}_{part['start_time']}_{part['end_time']}.csv" + self.check_stb_csv_correct(csv_file_name, part['count'], interlace_rows) + else: + # batch + interlace_rows = insert_rows + if not csv_ts_format: + # normal + pass + else: + # time slice + pass + # check result - def checResult(self, jsonFile): + def check_result(self, jsonFile): # csv with open(jsonFile) as file: data = json.load(file) - # read json + # read json database = data["databases"][0] - out = data["csvPath"] - dbName = database["dbinfo"]["name"] stables = database["super_tables"] - for stable in stables: - stbName = stable["name"] - childs = stable["childtable_count"] - insertRows = stable["insert_rows"] - interlaceRows = stable["interlace_rows"] - csvFile = f"{out}{dbName}-{stbName}.csv" - rows = childs * insertRows - if interlaceRows == 0: - interlaceRows = insertRows - # check csv context correct - self.checkCorrect(csvFile, rows, interlaceRows) - def checkExportCsv(self, benchmark, jsonFile, options=""): + for stable in stables: + # check csv context correct + self.check_stb_correct(data, database, stable) + + + def check_export_csv(self, benchmark, jsonFile, options=""): + # clear + self.clear_directory() + # exec cmd = f"{benchmark} {options} -f {jsonFile}" os.system(cmd) # check result - self.checResult(jsonFile) + self.check_result(jsonFile) + def run(self): # path benchmark = etool.benchMarkFile() - # do check - json = "tools/benchmark/basic/json/exportCsv.json" - self.checkExportCsv(benchmark, json) + # do check interlace normal + json = "tools/benchmark/basic/json/csv-interlace-normal.json" + self.check_export_csv(benchmark, json) def stop(self): tdSql.close()