Merge pull request #25211 from taosdata/fix/TD-28032

fix:same subtable same partition by leads to same table name in stream
This commit is contained in:
Haojun Liao 2024-04-01 15:49:47 +08:00 committed by GitHub
commit c6294200ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 302 additions and 19 deletions

View File

@ -267,7 +267,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
bool alreadyAddGroupId(char* ctbName);
bool isAutoTableName(char* ctbName);
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId);
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId);
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);

View File

@ -61,7 +61,7 @@ typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM;
#define SSTREAM_TASK_VER 3
#define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3

View File

@ -2141,10 +2141,14 @@ _end:
return TSDB_CODE_SUCCESS;
}
void buildCtbNameAddGroupId(char* ctbName, uint64_t groupId){
void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){
char tmp[TSDB_TABLE_NAME_LEN] = {0};
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put groupId to the end
if (stbName == NULL){
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId);
}else{
snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId);
}
ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end
strcat(ctbName, tmp);
}
@ -2154,6 +2158,7 @@ bool isAutoTableName(char* ctbName) { return (strlen(ctbName) == 34 && ctbName[0
bool alreadyAddGroupId(char* ctbName) {
size_t len = strlen(ctbName);
if (len == 0) return false;
size_t _location = len - 1;
while (_location > 0) {
if (ctbName[_location] < '0' || ctbName[_location] > '9') {

View File

@ -72,7 +72,9 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGetP(pArray, j);
pTask->ver = SSTREAM_TASK_VER;
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER;
}
if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
}
}

View File

@ -434,7 +434,9 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
pTask->ver = SSTREAM_TASK_VER;
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER;
}
tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos;

View File

@ -71,8 +71,8 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p
if (varTbName != NULL && varTbName != (void*)-1) {
name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
memcpy(name, varDataVal(varTbName), varDataLen(varTbName));
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0) {
buildCtbNameAddGroupId(name, groupId);
if (newSubTableRule && !isAutoTableName(name) && !alreadyAddGroupId(name) && groupId != 0 && stbFullName) {
buildCtbNameAddGroupId(stbFullName, name, groupId);
}
} else if (stbFullName) {
name = buildCtbNameByGroupId(stbFullName, groupId);
@ -182,10 +182,10 @@ void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDa
int64_t gid, bool newSubTableRule) {
if (pDataBlock->info.parTbName[0]) {
if (newSubTableRule && !isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0) {
!alreadyAddGroupId(pDataBlock->info.parTbName) && gid != 0 && stbFullName) {
pCreateTableReq->name = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
strcpy(pCreateTableReq->name, pDataBlock->info.parTbName);
buildCtbNameAddGroupId(pCreateTableReq->name, gid);
buildCtbNameAddGroupId(stbFullName, pCreateTableReq->name, gid);
// tqDebug("gen name from:%s", pDataBlock->info.parTbName);
} else {
pCreateTableReq->name = taosStrdup(pDataBlock->info.parTbName);
@ -671,10 +671,14 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
memset(dstTableName, 0, TSDB_TABLE_NAME_LEN);
buildCtbNameByGroupIdImpl(stbFullName, groupId, dstTableName);
} else {
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName) && groupId != 0) {
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
buildCtbNameAddGroupId(dstTableName, groupId);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
}
}
}

View File

@ -580,12 +580,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
} else {
char ctbName[TSDB_TABLE_FNAME_LEN] = {0};
if (pDataBlock->info.parTbName[0]) {
if(pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER &&
pTask->subtableWithoutMd5 != 1 &&
if(pTask->subtableWithoutMd5 != 1 &&
!isAutoTableName(pDataBlock->info.parTbName) &&
!alreadyAddGroupId(pDataBlock->info.parTbName) &&
groupId != 0){
buildCtbNameAddGroupId(pDataBlock->info.parTbName, groupId);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName, groupId);
}
}
} else {
buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName);

View File

@ -542,7 +542,6 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
int32_t len;
int32_t code;
pTask->ver = SSTREAM_TASK_VER;
tEncodeSize(tEncodeStreamTask, pTask, len, code);
if (code < 0) {
return -1;
@ -552,6 +551,9 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return -1;
}
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeStreamTask(&encoder, pTask);

View File

@ -78,14 +78,36 @@ class TDTestCase:
tdLog.info(cmd)
os.system(cmd)
def case1(self):
tdSql.execute(f'create database if not exists d1 vgroups 1')
tdSql.execute(f'use d1')
tdSql.execute(f'create table st(ts timestamp, i int) tags(t int)')
tdSql.execute(f'insert into t1 using st tags(1) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t2 using st tags(2) values(now, 1) (now+1s, 2)')
tdSql.execute(f'insert into t3 using st tags(3) values(now, 1) (now+1s, 2)')
tdSql.execute("create stream stream1 fill_history 1 into sta subtable(concat('new-', tname)) AS SELECT "
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
tdSql.execute("create stream stream2 fill_history 1 into stb subtable(concat('new-', tname)) AS SELECT "
"_wstart, count(*), avg(i) FROM st PARTITION BY tbname tname INTERVAL(1m)", show=True)
time.sleep(2)
tdSql.query("select * from sta")
tdSql.checkRows(3)
tdSql.query("select * from stb")
tdSql.checkRows(3)
# run
def run(self):
self.case1()
# gen data
random.seed(int(time.time()))
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
# create stream
tdSql.execute("use db")
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
tdSql.execute("create stream stream3 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
sql = "select count(*) from sta"
# loop wait max 60s to check count is ok
tdLog.info("loop wait result ...")

View File

@ -0,0 +1,243 @@
# 写一段python代码生成一个JSON串json 串为数组数组长度为10000每个元素为包含4000个key-value对的JSON字符串json 数组里每个元素里的4000个key不相同元素之间使用相同的keykey值为英文单词value 为int值且value 的范围是[0, 256]。把json串紧凑形式写入文件把json串存入parquet文件中把json串写入avro文件中,把json串写入到postgre sql表中表有两列第一列主int类型主键第二列为json类型数组的每个元素写入json类型里
import csv
import json
import os
import random
import string
import time
from faker import Faker
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import fastavro
import psycopg2
from psycopg2.extras import Json
def get_dir_size(start_path='.'):
total = 0
for dirpath, dirs, files in os.walk(start_path):
for f in files:
fp = os.path.join(dirpath, f)
# 获取文件大小并累加到total上
total += os.path.getsize(fp)
return total
def to_avro_record(obj):
return {key: value for key, value in obj.items()}
def generate_random_string(length):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_random_values(t):
if t == 0:
return random.randint(-255, 256)
elif t == 1:
return random.randint(-2100000000, 2100000000)
elif t == 2:
return random.uniform(-10000.0, 10000.0)
elif t == 3:
return generate_random_string(10)
elif t == 4:
return random.choice([True, False])
def generate_json_object(key_set, value_set):
values = [generate_random_values(t) for t in value_set]
return dict(zip(key_set, values))
def generate_json_array(keys, values, array_length):
return [generate_json_object(keys, values) for _ in range(array_length)]
def write_parquet_file(parquet_file, json_array):
df = pd.DataFrame(json_array)
table = pa.Table.from_pandas(df)
pq.write_table(table, parquet_file + ".parquet")
def write_json_file(json_file, json_array):
with open(json_file + ".json", 'w') as f:
json.dump(json_array, f, separators=(',', ':'))
def generate_avro_schema(k, t):
if t == 0:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 1:
return {"name": k, "type": "int", "logicalType": "int"}
elif t == 2:
return {"name": k, "type": "float"}
elif t == 3:
return {"name": k, "type": "string"}
elif t == 4:
return {"name": k, "type": "boolean"}
def write_avro_file(avro_file, json_array, keys, values):
k = list(json_array[0].keys())
if keys != k:
raise ValueError("keys and values should have the same length")
avro_schema = {
"type": "record",
"name": "MyRecord",
"fields": [generate_avro_schema(k, v) for k, v in dict(zip(keys, values)).items()]
}
avro_records = [to_avro_record(obj) for obj in json_array]
with open(avro_file + ".avro", 'wb') as f:
fastavro.writer(f, avro_schema, avro_records)
def write_pg_file(json_array):
conn_str = "dbname=mydatabase user=myuser host=localhost"
conn = psycopg2.connect(conn_str)
cur = conn.cursor()
cur.execute("drop table if exists my_table")
conn.commit()
# 创建表(如果不存在)
cur.execute("""
CREATE TABLE IF NOT EXISTS my_table (
id SERIAL PRIMARY KEY,
json_data JSONB
);
""")
conn.commit()
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows before:", row[0])
# 插入数据
for idx, json_obj in enumerate(json_array):
# print(json.dumps(json_obj))
cur.execute("INSERT INTO my_table (json_data) VALUES (%s)", (json.dumps(json_obj),))
conn.commit() # 提交事务
# 执行SQL查询
cur.execute("SELECT count(*) FROM my_table")
# 获取查询结果
rows = cur.fetchall()
# 打印查询结果
for row in rows:
print("rows after:", row[0])
# # 执行SQL查询
# cur.execute("SELECT pg_relation_size('my_table')")
# # 获取查询结果
# rows = cur.fetchall()
# # 打印查询结果
# size = 0
# for row in rows:
# size = row[0]
# print("table size:", row[0])
# 关闭游标和连接
cur.close()
conn.close()
def read_parquet_file(parquet_file):
table = pq.read_table(parquet_file + ".parquet")
df = table.to_pandas()
print(df)
def read_avro_file(avg_file):
with open(avg_file + ".avro", 'rb') as f:
reader = fastavro.reader(f)
for record in reader:
print(record)
def read_json_file(csv_file):
with open(csv_file + ".json", 'r') as f:
data = json.load(f)
print(data)
def main():
key_length = 7
key_sizes = 4000
row_sizes = 10000
file_name = "output"
# cases = [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (0, 4)]
cases = [(2, 2), (3, 3), (0, 4)]
for data in cases:
begin, end = data
print(f"执行类型:{begin}-{end}")
N = 2
for _ in range(N):
t0 = time.time()
keys = [generate_random_string(key_length) for _ in range(key_sizes)]
values = [random.randint(begin, end) for _ in range(key_sizes)]
# 生成JSON数组
json_array = generate_json_array(keys, values, row_sizes)
t1 = time.time()
write_json_file(file_name, json_array)
t2 = time.time()
write_parquet_file(file_name, json_array)
t3 = time.time()
write_avro_file(file_name, json_array, keys, values)
t4 = time.time()
size = write_pg_file(json_array)
t5 = time.time()
print("生成json 速度:", t2 - t0, "文件大小:", os.path.getsize(file_name + ".json"))
print("parquet 速度:", t3 - t2, "文件大小:", os.path.getsize(file_name + ".parquet"))
print("avro 速度:", t4 - t3, "文件大小:", os.path.getsize(file_name + ".avro"))
print("pg json 速度:", t5 - t4, "文件大小:", get_dir_size("/opt/homebrew/var/postgresql@14/base/16385") - 8 * 1024 * 1024)
# read_json_file(file_name)
# read_parquet_file(file_name)
# read_avro_file(file_name)
print(f"\n---------------\n")
if __name__ == "__main__":
main()
# 压缩文件
# import os
#
# import lz4.frame
#
#
# files =["output.json", "output.parquet", "output.avro"]
# def compress_file(input_path, output_path):
# with open(input_path, 'rb') as f_in:
# compressed_data = lz4.frame.compress(f_in.read())
#
# with open(output_path, 'wb') as f_out:
# f_out.write(compressed_data)
#
# for file in files:
# compress_file(file, file + ".lz4")
# print(file, "origin size:", os.path.getsize(file), " after lsz size:", os.path.getsize(file + ".lz4"))