From 8e2a2414c0d19f1470d5b17be331588a45896849 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 28 Mar 2024 13:48:46 +0800 Subject: [PATCH 1/3] fix:same subtable same partition by leads to same table name in stream --- include/common/tdatablock.h | 2 +- include/libs/stream/tstream.h | 2 +- source/common/src/tdatablock.c | 11 ++++++++--- source/dnode/mnode/impl/src/mndDef.c | 4 +++- source/dnode/mnode/impl/src/mndStream.c | 4 +++- source/dnode/vnode/src/tq/tqSink.c | 18 +++++++++++------- source/libs/stream/src/streamDispatch.c | 9 ++++++--- source/libs/stream/src/streamMeta.c | 4 +++- tests/system-test/8-stream/stream_basic.py | 22 ++++++++++++++++++++++ 9 files changed, 58 insertions(+), 18 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index fa6e4c3ae8..cf6d4ba2b0 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData bool alreadyAddGroupId(char* ctbName); bool isAutoTableName(char* ctbName); -void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId); +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId); char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5f3761d7b7..e7c6491b9d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; typedef struct SStreamTaskSM SStreamTaskSM; -#define SSTREAM_TASK_VER 3 +#define SSTREAM_TASK_VER 4 #define SSTREAM_TASK_INCOMPATIBLE_VER 1 #define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f4455be206..dc1c27b123 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2141,10 +2141,14 @@ _end: return TSDB_CODE_SUCCESS; } -void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){ +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){ char tmp[TSDB_TABLE_NAME_LEN] = {0}; - snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); - ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end + if (stbName == NULL){ + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); + }else{ + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId); + } + ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end strcat(ctbName, tmp); } @@ -2154,6 +2158,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0 bool alreadyAddGroupId(char* ctbName) { size_t len = strlen(ctbName); + if (len == 0) return false; size_t _location = len - 1; while (_location > 0) { if (ctbName[_location] < '0' || ctbName[_location] > '9') { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 3f69c7def3..091edc6ab0 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { SStreamTask *pTask = taosArrayGetP(pArray, j); - pTask->ver = SSTREAM_TASK_VER; + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; } } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6067af199e..ff05db417e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { SEncoder encoder; tEncoderInit(&encoder, NULL, 0); - pTask->ver = SSTREAM_TASK_VER; + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } tEncodeStreamTask(&encoder, pTask); int32_t size = encoder.pos; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 5374b9aa78..1e0ae7a854 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p if (varTbName != NULL && varTbName != (void*)-1) { name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); memcpy(name, varDataVal(varTbName), varDataLen(varTbName)); - if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) { - buildCtbNameAddGroupId(name, groupId); + if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) { + buildCtbNameAddGroupId(stbFullName, name, groupId); } } else if (stbFullName) { name = buildCtbNameByGroupId(stbFullName, groupId); @@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa int64_t gid, bool newSubTableRule) { if (pDataBlock->info.parTbName[0]) { if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) && - !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) { + !alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) { pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN); strcpy(pCreateTableReq->name, pDataBlock->info.parTbName); - buildCtbNameAddGroupId(pCreateTableReq->name, gid); + buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid); // tqDebug("gen name from:%s", pDataBlock->info.parTbName); } else { pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName); @@ -671,10 +671,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat memset(dstTableName, 0, TSDB_TABLE_NAME_LEN); buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName); } else { - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 && - !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) { + if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && + !alreadyAddGroupId(dstTableName) && groupId != 0) { tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); - buildCtbNameAddGroupId(dstTableName, groupId); + if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGroupId(NULL, dstTableName, groupId); + }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { + buildCtbNameAddGroupId(stbFullName, dstTableName, groupId); + } } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0af664f1e1..9542009d72 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -580,12 +580,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } else { char ctbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pDataBlock->info.parTbName[0]) { - if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && - pTask->subtableWithoutMd5 != 1 && + if(pTask->subtableWithoutMd5 != 1 && !isAutoTableName(pDataBlock->info.parTbName) && !alreadyAddGroupId(pDataBlock->info.parTbName) && groupId != 0){ - buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId); + if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId); + }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) { + buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId); + } } } else { buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c24763c024..aae3594905 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; int32_t code; - pTask->ver = SSTREAM_TASK_VER; tEncodeSize(tEncodeStreamTask, pTask, len, code); if (code < 0) { return -1; @@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return -1; } + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + pTask->ver = SSTREAM_TASK_VER; + } SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); tEncodeStreamTask(&encoder, pTask); diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 3ebc255114..f08cca1e13 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -78,8 +78,30 @@ class TDTestCase: tdLog.info(cmd) os.system(cmd) + def case1(self): + + tdSql.execute(f'create database if not exists d1 vgroups 1') + tdSql.execute(f'use d1') + tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)') + tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)') + tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)') + + tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " + "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) + + tdSql.execute("create stream stream2 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " + "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) + + time.sleep(2) + tdSql.query("select * from sta") + tdSql.checkRows(3) + + tdSql.query("select * from stb") + tdSql.checkRows(3) # run def run(self): + self.case1() # gen data random.seed(int(time.time())) self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") From 570d78c58d201aa275bf716a0d971b80d08a7b4c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 28 Mar 2024 17:54:00 +0800 Subject: [PATCH 2/3] fix:test case error --- tests/system-test/8-stream/stream_basic.py | 2 +- tests/system-test/buildJson.py | 243 +++++++++++++++++++++ 2 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 tests/system-test/buildJson.py diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index f08cca1e13..8dfb14da56 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -90,7 +90,7 @@ class TDTestCase: tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) - tdSql.execute("create stream stream2 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT " + tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT " "_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True) time.sleep(2) diff --git a/tests/system-test/buildJson.py b/tests/system-test/buildJson.py new file mode 100644 index 0000000000..6e9e9f83e1 --- /dev/null +++ b/tests/system-test/buildJson.py @@ -0,0 +1,243 @@ +# 写一段python代码,生成一个JSON串,json 串为数组,数组长度为10000,每个元素为包含4000个key-value对的JSON字符串,json 数组里每个元素里的4000个key不相同,元素之间使用相同的key,key值为英文单词,value 为int值,且value 的范围是[0, 256]。把json串紧凑形式写入文件,把json串存入parquet文件中,把json串写入avro文件中,把json串写入到postgre sql表中,表有两列第一列主int类型主键,第二列为json类型,数组的每个元素写入json类型里 +import csv +import json +import os +import random +import string +import time + +from faker import Faker +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import fastavro +import psycopg2 +from psycopg2.extras import Json + + +def get_dir_size(start_path='.'): + total = 0 + for dirpath, dirs, files in os.walk(start_path): + for f in files: + fp = os.path.join(dirpath, f) + # 获取文件大小并累加到total上 + total += os.path.getsize(fp) + return total + + +def to_avro_record(obj): + return {key: value for key, value in obj.items()} + + +def generate_random_string(length): + return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) + + +def generate_random_values(t): + if t == 0: + return random.randint(-255, 256) + elif t == 1: + return random.randint(-2100000000, 2100000000) + elif t == 2: + return random.uniform(-10000.0, 10000.0) + elif t == 3: + return generate_random_string(10) + elif t == 4: + return random.choice([True, False]) + + +def generate_json_object(key_set, value_set): + values = [generate_random_values(t) for t in value_set] + return dict(zip(key_set, values)) + + +def generate_json_array(keys, values, array_length): + return [generate_json_object(keys, values) for _ in range(array_length)] + + +def write_parquet_file(parquet_file, json_array): + df = pd.DataFrame(json_array) + table = pa.Table.from_pandas(df) + pq.write_table(table, parquet_file + ".parquet") + + +def write_json_file(json_file, json_array): + with open(json_file + ".json", 'w') as f: + json.dump(json_array, f, separators=(',', ':')) + + +def generate_avro_schema(k, t): + if t == 0: + return {"name": k, "type": "int", "logicalType": "int"} + elif t == 1: + return {"name": k, "type": "int", "logicalType": "int"} + elif t == 2: + return {"name": k, "type": "float"} + elif t == 3: + return {"name": k, "type": "string"} + elif t == 4: + return {"name": k, "type": "boolean"} + + +def write_avro_file(avro_file, json_array, keys, values): + k = list(json_array[0].keys()) + + if keys != k: + raise ValueError("keys and values should have the same length") + + avro_schema = { + "type": "record", + "name": "MyRecord", + "fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()] + } + + avro_records = [to_avro_record(obj) for obj in json_array] + with open(avro_file + ".avro", 'wb') as f: + fastavro.writer(f, avro_schema, avro_records) + + +def write_pg_file(json_array): + conn_str = "dbname=mydatabase user=myuser host=localhost" + conn = psycopg2.connect(conn_str) + cur = conn.cursor() + + cur.execute("drop table if exists my_table") + conn.commit() + + # 创建表(如果不存在) + cur.execute(""" + CREATE TABLE IF NOT EXISTS my_table ( + id SERIAL PRIMARY KEY, + json_data JSONB + ); + """) + conn.commit() + + # 执行SQL查询 + cur.execute("SELECT count(*) FROM my_table") + # 获取查询结果 + rows = cur.fetchall() + # 打印查询结果 + for row in rows: + print("rows before:", row[0]) + + # 插入数据 + for idx, json_obj in enumerate(json_array): + # print(json.dumps(json_obj)) + cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),)) + + conn.commit() # 提交事务 + + # 执行SQL查询 + cur.execute("SELECT count(*) FROM my_table") + # 获取查询结果 + rows = cur.fetchall() + # 打印查询结果 + for row in rows: + print("rows after:", row[0]) + + # # 执行SQL查询 + # cur.execute("SELECT pg_relation_size('my_table')") + # # 获取查询结果 + # rows = cur.fetchall() + # # 打印查询结果 + # size = 0 + # for row in rows: + # size = row[0] + # print("table size:", row[0]) + + # 关闭游标和连接 + cur.close() + conn.close() + +def read_parquet_file(parquet_file): + table = pq.read_table(parquet_file + ".parquet") + df = table.to_pandas() + print(df) + + +def read_avro_file(avg_file): + with open(avg_file + ".avro", 'rb') as f: + reader = fastavro.reader(f) + + for record in reader: + print(record) + + +def read_json_file(csv_file): + with open(csv_file + ".json", 'r') as f: + data = json.load(f) + print(data) + + +def main(): + key_length = 7 + key_sizes = 4000 + row_sizes = 10000 + file_name = "output" + + # cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)] + cases = [(2, 2), (3, 3), (0, 4)] + + for data in cases: + begin, end = data + print(f"执行类型:{begin}-{end}") + + N = 2 + for _ in range(N): + + t0 = time.time() + + keys = [generate_random_string(key_length) for _ in range(key_sizes)] + values = [random.randint(begin, end) for _ in range(key_sizes)] + # 生成JSON数组 + json_array = generate_json_array(keys, values, row_sizes) + + t1 = time.time() + + write_json_file(file_name, json_array) + + t2 = time.time() + + write_parquet_file(file_name, json_array) + + t3 = time.time() + + write_avro_file(file_name, json_array, keys, values) + + t4 = time.time() + + size = write_pg_file(json_array) + + t5 = time.time() + + print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json")) + print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet")) + print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro")) + print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024) + + # read_json_file(file_name) + # read_parquet_file(file_name) + # read_avro_file(file_name) + print(f"\n---------------\n") + +if __name__ == "__main__": + main() + +# 压缩文件 +# import os +# +# import lz4.frame +# +# +# files =["output.json", "output.parquet", "output.avro"] +# def compress_file(input_path, output_path): +# with open(input_path, 'rb') as f_in: +# compressed_data = lz4.frame.compress(f_in.read()) +# +# with open(output_path, 'wb') as f_out: +# f_out.write(compressed_data) +# +# for file in files: +# compress_file(file, file + ".lz4") +# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4")) From 00cc68a7903e689941bf32b8da2a69a921f7b350 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 29 Mar 2024 09:07:09 +0800 Subject: [PATCH 3/3] fix:test case error --- tests/system-test/8-stream/stream_basic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/8-stream/stream_basic.py b/tests/system-test/8-stream/stream_basic.py index 8dfb14da56..5167423ea3 100644 --- a/tests/system-test/8-stream/stream_basic.py +++ b/tests/system-test/8-stream/stream_basic.py @@ -107,7 +107,7 @@ class TDTestCase: self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y") # create stream tdSql.execute("use db") - tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) + tdSql.execute("create stream stream3 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) sql = "select count(*) from sta" # loop wait max 60s to check count is ok tdLog.info("loop wait result ...")