From 1c56c5ab83a78ccec3042ecc5f80e0fd304fe2b1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 1 Apr 2024 19:16:54 +0800 Subject: [PATCH 1/2] fix:delete the file that testing JSON compression rate --- tests/system-test/buildJson.py | 243 --------------------------------- 1 file changed, 243 deletions(-) delete mode 100644 tests/system-test/buildJson.py diff --git a/tests/system-test/buildJson.py b/tests/system-test/buildJson.py deleted file mode 100644 index 6e9e9f83e1..0000000000 --- a/tests/system-test/buildJson.py +++ /dev/null @@ -1,243 +0,0 @@ -# 写一段python代码,生成一个JSON串,json 串为数组,数组长度为10000,每个元素为包含4000个key-value对的JSON字符串,json 数组里每个元素里的4000个key不相同,元素之间使用相同的key,key值为英文单词,value 为int值,且value 的范围是[0, 256]。把json串紧凑形式写入文件,把json串存入parquet文件中,把json串写入avro文件中,把json串写入到postgre sql表中,表有两列第一列主int类型主键,第二列为json类型,数组的每个元素写入json类型里 -import csv -import json -import os -import random -import string -import time - -from faker import Faker -import pandas as pd -import pyarrow as pa -import pyarrow.parquet as pq -import fastavro -import psycopg2 -from psycopg2.extras import Json - - -def get_dir_size(start_path='.'): - total = 0 - for dirpath, dirs, files in os.walk(start_path): - for f in files: - fp = os.path.join(dirpath, f) - # 获取文件大小并累加到total上 - total += os.path.getsize(fp) - return total - - -def to_avro_record(obj): - return {key: value for key, value in obj.items()} - - -def generate_random_string(length): - return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) - - -def generate_random_values(t): - if t == 0: - return random.randint(-255, 256) - elif t == 1: - return random.randint(-2100000000, 2100000000) - elif t == 2: - return random.uniform(-10000.0, 10000.0) - elif t == 3: - return generate_random_string(10) - elif t == 4: - return random.choice([True, False]) - - -def generate_json_object(key_set, value_set): - values = [generate_random_values(t) for t in value_set] - return dict(zip(key_set, values)) - - -def generate_json_array(keys, values, array_length): - return [generate_json_object(keys, values) for _ in range(array_length)] - - -def write_parquet_file(parquet_file, json_array): - df = pd.DataFrame(json_array) - table = pa.Table.from_pandas(df) - pq.write_table(table, parquet_file + ".parquet") - - -def write_json_file(json_file, json_array): - with open(json_file + ".json", 'w') as f: - json.dump(json_array, f, separators=(',', ':')) - - -def generate_avro_schema(k, t): - if t == 0: - return {"name": k, "type": "int", "logicalType": "int"} - elif t == 1: - return {"name": k, "type": "int", "logicalType": "int"} - elif t == 2: - return {"name": k, "type": "float"} - elif t == 3: - return {"name": k, "type": "string"} - elif t == 4: - return {"name": k, "type": "boolean"} - - -def write_avro_file(avro_file, json_array, keys, values): - k = list(json_array[0].keys()) - - if keys != k: - raise ValueError("keys and values should have the same length") - - avro_schema = { - "type": "record", - "name": "MyRecord", - "fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()] - } - - avro_records = [to_avro_record(obj) for obj in json_array] - with open(avro_file + ".avro", 'wb') as f: - fastavro.writer(f, avro_schema, avro_records) - - -def write_pg_file(json_array): - conn_str = "dbname=mydatabase user=myuser host=localhost" - conn = psycopg2.connect(conn_str) - cur = conn.cursor() - - cur.execute("drop table if exists my_table") - conn.commit() - - # 创建表(如果不存在) - cur.execute(""" - CREATE TABLE IF NOT EXISTS my_table ( - id SERIAL PRIMARY KEY, - json_data JSONB - ); - """) - conn.commit() - - # 执行SQL查询 - cur.execute("SELECT count(*) FROM my_table") - # 获取查询结果 - rows = cur.fetchall() - # 打印查询结果 - for row in rows: - print("rows before:", row[0]) - - # 插入数据 - for idx, json_obj in enumerate(json_array): - # print(json.dumps(json_obj)) - cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),)) - - conn.commit() # 提交事务 - - # 执行SQL查询 - cur.execute("SELECT count(*) FROM my_table") - # 获取查询结果 - rows = cur.fetchall() - # 打印查询结果 - for row in rows: - print("rows after:", row[0]) - - # # 执行SQL查询 - # cur.execute("SELECT pg_relation_size('my_table')") - # # 获取查询结果 - # rows = cur.fetchall() - # # 打印查询结果 - # size = 0 - # for row in rows: - # size = row[0] - # print("table size:", row[0]) - - # 关闭游标和连接 - cur.close() - conn.close() - -def read_parquet_file(parquet_file): - table = pq.read_table(parquet_file + ".parquet") - df = table.to_pandas() - print(df) - - -def read_avro_file(avg_file): - with open(avg_file + ".avro", 'rb') as f: - reader = fastavro.reader(f) - - for record in reader: - print(record) - - -def read_json_file(csv_file): - with open(csv_file + ".json", 'r') as f: - data = json.load(f) - print(data) - - -def main(): - key_length = 7 - key_sizes = 4000 - row_sizes = 10000 - file_name = "output" - - # cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)] - cases = [(2, 2), (3, 3), (0, 4)] - - for data in cases: - begin, end = data - print(f"执行类型:{begin}-{end}") - - N = 2 - for _ in range(N): - - t0 = time.time() - - keys = [generate_random_string(key_length) for _ in range(key_sizes)] - values = [random.randint(begin, end) for _ in range(key_sizes)] - # 生成JSON数组 - json_array = generate_json_array(keys, values, row_sizes) - - t1 = time.time() - - write_json_file(file_name, json_array) - - t2 = time.time() - - write_parquet_file(file_name, json_array) - - t3 = time.time() - - write_avro_file(file_name, json_array, keys, values) - - t4 = time.time() - - size = write_pg_file(json_array) - - t5 = time.time() - - print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json")) - print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet")) - print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro")) - print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024) - - # read_json_file(file_name) - # read_parquet_file(file_name) - # read_avro_file(file_name) - print(f"\n---------------\n") - -if __name__ == "__main__": - main() - -# 压缩文件 -# import os -# -# import lz4.frame -# -# -# files =["output.json", "output.parquet", "output.avro"] -# def compress_file(input_path, output_path): -# with open(input_path, 'rb') as f_in: -# compressed_data = lz4.frame.compress(f_in.read()) -# -# with open(output_path, 'wb') as f_out: -# f_out.write(compressed_data) -# -# for file in files: -# compress_file(file, file + ".lz4") -# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4")) From 174b0a104e5cab39e3eaa953dc579c9f39cdef54 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 1 Apr 2024 19:59:30 +0800 Subject: [PATCH 2/2] fix:case error --- source/dnode/mnode/impl/src/mndConsumer.c | 8 ++++---- tests/system-test/8-stream/stream_basic.py | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 52671f6b66..ed9333f480 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -292,16 +292,16 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){ int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp); if (tlen <= 0){ - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_TMQ_INVALID_MSG; } void *buf = rpcMallocCont(tlen); if (buf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){ + if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){ rpcFreeCont(buf); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_TMQ_INVALID_MSG; } pMsg->info.rsp = buf; pMsg->info.rspLen = tlen; @@ -316,7 +316,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = NULL; if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 5167423ea3..ff16bee787 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -96,9 +96,28 @@ class TDTestCase: time.sleep(2) tdSql.query("select * from sta") tdSql.checkRows(3) + tdSql.query("select tbname from sta order by tbname") + if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'): + tdLog.exit("error1") + + if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'): + tdLog.exit("error2") + + if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'): + tdLog.exit("error3") tdSql.query("select * from stb") tdSql.checkRows(3) + tdSql.query("select tbname from stb order by tbname") + if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'): + tdLog.exit("error4") + + if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'): + tdLog.exit("error5") + + if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'): + tdLog.exit("error6") + # run def run(self): self.case1()