Merge pull request #25234 from taosdata/fix/TD-28032

fix:cases error
This commit is contained in:
dapan1121 2024-04-02 10:17:04 +08:00 committed by GitHub
commit eb50334baf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 23 additions and 247 deletions

View File

@ -292,16 +292,16 @@ static void storeOffsetRows(SMnode *pMnode, SMqHbReq *req, SMqConsumerObj *pCons
static int32_t buildMqHbRsp(SRpcMsg *pMsg, SMqHbRsp *rsp){
int32_t tlen = tSerializeSMqHbRsp(NULL, 0, rsp);
if (tlen <= 0){
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_TMQ_INVALID_MSG;
}
void *buf = rpcMallocCont(tlen);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(tSerializeSMqHbRsp(buf, tlen, rsp) != 0){
if(tSerializeSMqHbRsp(buf, tlen, rsp) <= 0){
rpcFreeCont(buf);
return TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_TMQ_INVALID_MSG;
}
pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen;
@ -316,7 +316,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer = NULL;
if (tDeserializeSMqHbReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_TMQ_INVALID_MSG;
goto end;
}

View File

@ -96,9 +96,28 @@ class TDTestCase:
time.sleep(2)
tdSql.query("select * from sta")
tdSql.checkRows(3)
tdSql.query("select tbname from sta order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.sta_'):
tdLog.exit("error1")
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.sta_'):
tdLog.exit("error2")
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.sta_'):
tdLog.exit("error3")
tdSql.query("select * from stb")
tdSql.checkRows(3)
tdSql.query("select tbname from stb order by tbname")
if not tdSql.getData(0, 0).startswith('new-t1_1.d1.stb_'):
tdLog.exit("error4")
if not tdSql.getData(1, 0).startswith('new-t2_1.d1.stb_'):
tdLog.exit("error5")
if not tdSql.getData(2, 0).startswith('new-t3_1.d1.stb_'):
tdLog.exit("error6")
# run
def run(self):
self.case1()

View File

@ -1,243 +0,0 @@
# 写一段python代码生成一个JSON串json 串为数组数组长度为10000每个元素为包含4000个key-value对的JSON字符串json 数组里每个元素里的4000个key不相同元素之间使用相同的keykey值为英文单词value 为int值且value 的范围是[0, 256]。把json串紧凑形式写入文件把json串存入parquet文件中把json串写入avro文件中,把json串写入到postgre sql表中表有两列第一列主int类型主键第二列为json类型数组的每个元素写入json类型里
import csv
import json
import os
import random
import string
import time
from faker import Faker
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro
import psycopg2
from psycopg2.extras import Json
def get_dir_size(start_path='.'):
total = 0
for dirpath, dirs, files in os.walk(start_path):
for f in files:
fp = os.path.join(dirpath, f)
# 获取文件大小并累加到total上
total += os.path.getsize(fp)
return total
def to_avro_record(obj):
return {key: value for key, value in obj.items()}
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_random_values(t):
if t == 0:
return random.randint(-255, 256)
elif t == 1:
return random.randint(-2100000000, 2100000000)
elif t == 2:
return random.uniform(-10000.0, 10000.0)
elif t == 3:
return generate_random_string(10)
elif t == 4:
return random.choice([True, False])
def generate_json_object(key_set, value_set):
values = [generate_random_values(t) for t in value_set]
return dict(zip(key_set, values))
def generate_json_array(keys, values, array_length):
return [generate_json_object(keys, values) for _ in range(array_length)]
def write_parquet_file(parquet_file, json_array):
df = pd.DataFrame(json_array)
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_file + ".parquet")
def write_json_file(json_file, json_array):
with open(json_file + ".json", 'w') as f:
json.dump(json_array, f, separators=(',', ':'))
def generate_avro_schema(k, t):
if t == 0:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 1:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 2:
return {"name": k, "type": "float"}
elif t == 3:
return {"name": k, "type": "string"}
elif t == 4:
return {"name": k, "type": "boolean"}
def write_avro_file(avro_file, json_array, keys, values):
k = list(json_array[0].keys())
if keys != k:
raise ValueError("keys and values should have the same length")
avro_schema = {
"type": "record",
"name": "MyRecord",
"fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()]
}
avro_records = [to_avro_record(obj) for obj in json_array]
with open(avro_file + ".avro", 'wb') as f:
fastavro.writer(f, avro_schema, avro_records)
def write_pg_file(json_array):
conn_str = "dbname=mydatabase user=myuser host=localhost"
conn = psycopg2.connect(conn_str)
cur = conn.cursor()
cur.execute("drop table if exists my_table")
conn.commit()
# 创建表(如果不存在)
cur.execute("""
CREATE TABLE IF NOT EXISTS my_table (
id SERIAL PRIMARY KEY,
json_data JSONB
);
""")
conn.commit()
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows before:", row[0])
# 插入数据
for idx, json_obj in enumerate(json_array):
# print(json.dumps(json_obj))
cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),))
conn.commit() # 提交事务
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows after:", row[0])
# # 执行SQL查询
# cur.execute("SELECT pg_relation_size('my_table')")
# # 获取查询结果
# rows = cur.fetchall()
# # 打印查询结果
# size = 0
# for row in rows:
# size = row[0]
# print("table size:", row[0])
# 关闭游标和连接
cur.close()
conn.close()
def read_parquet_file(parquet_file):
table = pq.read_table(parquet_file + ".parquet")
df = table.to_pandas()
print(df)
def read_avro_file(avg_file):
with open(avg_file + ".avro", 'rb') as f:
reader = fastavro.reader(f)
for record in reader:
print(record)
def read_json_file(csv_file):
with open(csv_file + ".json", 'r') as f:
data = json.load(f)
print(data)
def main():
key_length = 7
key_sizes = 4000
row_sizes = 10000
file_name = "output"
# cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)]
cases = [(2, 2), (3, 3), (0, 4)]
for data in cases:
begin, end = data
print(f"执行类型:{begin}-{end}")
N = 2
for _ in range(N):
t0 = time.time()
keys = [generate_random_string(key_length) for _ in range(key_sizes)]
values = [random.randint(begin, end) for _ in range(key_sizes)]
# 生成JSON数组
json_array = generate_json_array(keys, values, row_sizes)
t1 = time.time()
write_json_file(file_name, json_array)
t2 = time.time()
write_parquet_file(file_name, json_array)
t3 = time.time()
write_avro_file(file_name, json_array, keys, values)
t4 = time.time()
size = write_pg_file(json_array)
t5 = time.time()
print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json"))
print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet"))
print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro"))
print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024)
# read_json_file(file_name)
# read_parquet_file(file_name)
# read_avro_file(file_name)
print(f"\n---------------\n")
if __name__ == "__main__":
main()
# 压缩文件
# import os
#
# import lz4.frame
#
#
# files =["output.json", "output.parquet", "output.avro"]
# def compress_file(input_path, output_path):
# with open(input_path, 'rb') as f_in:
# compressed_data = lz4.frame.compress(f_in.read())
#
# with open(output_path, 'wb') as f_out:
# f_out.write(compressed_data)
#
# for file in files:
# compress_file(file, file + ".lz4")
# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4"))