Merge pull request #30132 from taosdata/enh/TS-5089

test: add more test cases for taosBenchmark csv export
This commit is contained in:
Linhe Huo 2025-03-14 09:16:49 +08:00 committed by GitHub
commit 67e5d5b7d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 266 additions and 19 deletions

View File

@ -18,6 +18,8 @@ import datetime
import random
import copy
import json
import tempfile
import uuid
import frame.eos
import frame.etool
@ -477,3 +479,48 @@ class TBase:
print(rlist)
return rlist
# generate new json file
def genNewJson(self, jsonFile, modifyFunc=None):
try:
with open(jsonFile, 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError:
tdLog.info(f"the specified json file '{jsonFile}' was not found.")
return None
except Exception as e:
tdLog.info(f"error reading the json file: {e}")
return None
if callable(modifyFunc):
modifyFunc(data)
tempDir = os.path.join(tempfile.gettempdir(), 'json_templates')
try:
os.makedirs(tempDir, exist_ok=True)
except PermissionError:
tdLog.info(f"no sufficient permissions to create directory at '{tempDir}'.")
return None
except Exception as e:
tdLog.info(f"error creating temporary directory: {e}")
return None
tempPath = os.path.join(tempDir, f"temp_{uuid.uuid4().hex}.json")
try:
with open(tempPath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
tdLog.info(f"error writing to temporary json file: {e}")
return None
tdLog.info(f"create temporary json file successfully, file: {tempPath}")
return tempPath
# delete file
def deleteFile(self, filename):
try:
if os.path.exists(filename):
os.remove(filename)
except Exception as err:
raise Exception(err)

View File

@ -121,20 +121,21 @@ class TDTestCase(TBase):
else:
# switch to another child table
if batch != interlace_rows:
tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}")
tdLog.exit(f"interlace rows is not as expected. tbname={name}, actual: {batch}, expected: {interlace_rows}, count: {count}, csv_file_name: {csv_file_name}")
batch = 1
name = row[tbname_idx]
# count ++
count += 1
# batch
if batch != interlace_rows:
tdLog.exit(f"interlace_rows invalid. tbName={name} actual={batch} expected={interlace_rows} i={count} csv_file_name={csv_file_name}")
tdLog.exit(f"interlace rows is not as expected. tbname={name}, actual: {batch}, expected: {interlace_rows}, count: {count}, csv_file_name: {csv_file_name}")
# check all rows
if count != all_rows:
tdLog.exit(f"all_rows invalid. actual={count} expected={all_rows} csv_file_name={csv_file_name}")
tdLog.exit(f"total rows is not as expected. actual: {count}, expected: {all_rows}, csv_file_name: {csv_file_name}")
tdLog.info(f"Check generate csv file successfully. csv_file_name={csv_file_name} count={count} interlace_rows={batch}")
tdLog.info(f"check generate csv file successfully. csv_file_name: {csv_file_name}, count: {count}, interlace_rows: {interlace_rows}")
# check correct
@ -194,9 +195,9 @@ class TDTestCase(TBase):
# check result
def check_result(self, jsonFile):
def check_result(self, json_file):
# csv
with open(jsonFile) as file:
with open(json_file) as file:
data = json.load(file)
# read json
@ -208,25 +209,175 @@ class TDTestCase(TBase):
self.check_stb_correct(data, database, stable)
def check_export_csv(self, benchmark, jsonFile, options=""):
def exec_benchmark(self, benchmark, json_file, options=""):
cmd = f"{benchmark} {options} -f {json_file}"
eos.exe(cmd)
def check_export_csv_main(self, benchmark, json_file, options=""):
# clear
self.clear_directory()
# exec
cmd = f"{benchmark} {options} -f {jsonFile}"
eos.exe(cmd)
self.exec_benchmark(benchmark, json_file, options)
# check result
self.check_result(jsonFile)
self.check_result(json_file)
def check_export_csv_others(self, benchmark, json_file, options=""):
# clear
self.clear_directory()
# file ts interval second
new_json_file = self.genNewJson(json_file, self.func_csv_ts_interval_second)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/data_20231115061320_20231115061321.csv", 10001)
self.deleteFile(new_json_file)
# file ts interval minute
new_json_file = self.genNewJson(json_file, self.func_csv_ts_interval_minute)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/data_202311150613_202311150614.csv", 10001)
self.deleteFile(new_json_file)
# file ts interval hour
new_json_file = self.genNewJson(json_file, self.func_csv_ts_interval_hour)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/data_2023111506_2023111507.csv", 10001)
self.deleteFile(new_json_file)
# db precision us
new_json_file = self.genNewJson(json_file, self.func_db_precision_us)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/data_20231115_20231116.csv", 10001)
self.deleteFile(new_json_file)
# db precision ns
new_json_file = self.genNewJson(json_file, self.func_db_precision_ns)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/data_20231115_20231116.csv", 10001)
self.deleteFile(new_json_file)
# thread num
new_json_file = self.genNewJson(json_file, self.func_thread_num)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/data_10.csv", 1001)
self.deleteFile(new_json_file)
# create sql
new_json_file = self.genNewJson(json_file, self.func_create_sql)
self.exec_benchmark(benchmark, new_json_file, options)
self.check_file_line_count("./csv/create_stmt.txt", 2)
self.deleteFile(new_json_file)
# gzip
new_json_file = self.genNewJson(json_file, self.func_gzip)
self.exec_benchmark(benchmark, new_json_file, options)
eos.exe("gunzip ./csv/data.csv.gz")
self.check_file_line_count("./csv/data.csv", 10001)
self.deleteFile(new_json_file)
def func_csv_ts_interval_second(self, data):
db = data['databases'][0]
stb = db["super_tables"][0]
stb['timestamp_step'] = '10'
stb['csv_ts_format'] = '%Y%m%d%H%M%S'
stb['csv_ts_interval'] = '1s'
def func_csv_ts_interval_minute(self, data):
db = data['databases'][0]
stb = db["super_tables"][0]
stb['timestamp_step'] = '600'
stb['csv_ts_format'] = '%Y%m%d%H%M'
stb['csv_ts_interval'] = '1m'
def func_csv_ts_interval_hour(self, data):
db = data['databases'][0]
stb = db["super_tables"][0]
stb['timestamp_step'] = '36000'
stb['csv_ts_format'] = '%Y%m%d%H'
stb['csv_ts_interval'] = '1h'
def func_db_precision_us(self, data):
db = data['databases'][0]
db['dbinfo']['precision'] = 'us'
stb = db["super_tables"][0]
stb['start_timestamp'] = 1700000000000000
def func_db_precision_ns(self, data):
db = data['databases'][0]
db['dbinfo']['precision'] = 'ns'
stb = db["super_tables"][0]
stb['start_timestamp'] = 1700000000000000000
def func_thread_num(self, data):
data['thread_count'] = 12
db = data['databases'][0]
stb = db["super_tables"][0]
stb.pop('interlace_rows', None)
stb.pop('csv_ts_format', None)
stb.pop('csv_ts_interval', None)
def func_create_sql(self, data):
db = data['databases'][0]
dbinfo = db['dbinfo']
dbinfo['buffer'] = 256
dbinfo['cachemode'] = 'none'
stb = db["super_tables"][0]
stb['primary_key'] = 1
stb['columns'][0] = { "type": "bool", "name": "bc", "encode": 'simple8b', 'compress': 'lz4', 'level': 'medium'}
stb['comment'] = "csv export sample"
stb['delay'] = 10
stb['file_factor'] = 20
stb['rollup'] = 'min'
stb['max_delay'] = '300s'
stb['watermark'] = '10m'
stb['columns'][1] = { "type": "float", "name": "fc", "min": 1, "sma": "yes"}
stb['columns'][2] = { "type": "double", "name": "dc", "min":10, "max":10, "sma": "yes"}
def func_gzip(self, data):
db = data['databases'][0]
stb = db["super_tables"][0]
stb.pop('csv_ts_format', None)
stb.pop('csv_ts_interval', None)
stb['csv_compress_level'] = "fast"
def check_file_line_count(self, filename, expected_lines):
try:
with open(filename, 'r', encoding='utf-8') as file:
actual_lines = sum(1 for line in file)
if expected_lines >= 0:
is_correct = actual_lines == expected_lines
if not is_correct:
tdLog.exit(f"check csv data failed, actual: {actual_lines}, expected: {expected_lines}, filename: {filename}")
except FileNotFoundError:
tdLog.exit(f"check csv data failed, file not exists. filename: {filename}")
def run(self):
# path
benchmark = etool.benchMarkFile()
# do check interlace normal
json = "tools/benchmark/basic/json/csv-export.json"
self.check_export_csv(benchmark, json)
# check normal
json_file = "tools/benchmark/basic/json/csv-export.json"
self.check_export_csv_main(benchmark, json_file)
# check others
json_file = "tools/benchmark/basic/json/csv-export-template.json"
self.check_export_csv_others(benchmark, json_file)
def stop(self):
tdSql.close()

View File

@ -0,0 +1,54 @@
{
"filetype": "csvfile",
"output_dir": "./csv",
"databases": [
{
"dbinfo": {
"name": "csvdb",
"precision": "ms"
},
"super_tables": [
{
"name": "stb",
"childtable_count": 1010,
"insert_rows": 1000,
"interlace_rows": 1,
"childtable_prefix": "d",
"timestamp_step": 1000000,
"start_timestamp":1700000000000,
"childtable_from": 1000,
"childtable_to": 1010,
"csv_file_prefix": "data",
"csv_ts_format": "%Y%m%d",
"csv_ts_interval": "1d",
"csv_output_header": "yes",
"csv_tbname_alias": "device_id",
"csv_compress_level": "none",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc", "min": 1},
{ "type": "double", "name": "dc", "min":10, "max":10},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si"},
{ "type": "int", "name": "ic", "fillNull":"false"},
{ "type": "bigint", "name": "bi"},
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi", "min":100, "max":120},
{ "type": "uint", "name": "ui"},
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 16}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"type": "binary", "name": "location", "len": 16,
"values": ["San Francisco", "Los Angles", "San Diego",
"San Jose", "Palo Alto", "Campbell", "Mountain View",
"Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}

View File

@ -355,11 +355,6 @@ int csvGenCreateDbSql(SDataBase* db, char* buf, int size) {
pos += snprintf(buf + pos, size - pos, g_arguments->escape_character ? "`%s`" : "%s", db->dbName);
if (pos <= 0 || pos >= size) return -1;
if (-1 != g_arguments->inputted_vgroups) {
pos += snprintf(buf + pos, size - pos, " VGROUPS %d", g_arguments->inputted_vgroups);
if (pos <= 0 || pos >= size) return -1;
}
if (db->cfgs) {
for (size_t i = 0; i < db->cfgs->size; ++i) {
SDbCfg* cfg = benchArrayGet(db->cfgs, i);