Merge pull request #29707 from taosdata/fix/TS-5946/lfjoin

Fix/ts 5946/lfjoin
This commit is contained in:
Shengliang Guan 2025-02-11 14:09:26 +08:00 committed by GitHub
commit 0bc253b885
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 122 additions and 4 deletions

View File

@ -2219,7 +2219,7 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdf
break;
}
}
};
}
taosMemoryFree(task);
return code;
}

View File

@ -768,7 +768,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->id.queryId = pCxt->queryId;
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
//pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
code = stbSplCreatePartWindowNode((SWindowLogicNode*)pSubplan->pNode, &pPartWindow);
if (TSDB_CODE_SUCCESS == code) {
@ -892,7 +892,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
if (TSDB_CODE_SUCCESS == code) {
stbSplSetTableMergeScan(pChild);
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
//SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
++(pCxt->groupId);
} else {
nodesDestroyList(pMergeKeys);
@ -1201,7 +1201,7 @@ static int32_t stbSplSplitAggNodeForCrossTableMulSubplan(SSplitContext* pCxt, SS
SLogicSubplan* pSubplan = (SLogicSubplan*)pNode;
pSubplan->id.groupId = pCxt->groupId;
pSubplan->id.queryId = pCxt->queryId;
pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
//pSubplan->splitFlag = SPLIT_FLAG_STABLE_SPLIT;
splSetSubplanVgroups(pSubplan, pSubplan->pNode);
code = stbSplCreatePartAggNode((SAggLogicNode*)pSubplan->pNode, &pPartAgg);
if (code) break;

View File

@ -170,6 +170,122 @@ class TDTestCase(TBase):
tdSql.checkData(0, 1, 2)
tdSql.checkData(2, 1, 1)
def ts5946(self):
tdLog.info("check bug TD_xx ...\n")
sqls = [
"drop database if exists ctg_tsdb",
"create database ctg_tsdb cachemodel 'both' stt_trigger 1;",
"use ctg_tsdb;",
"CREATE STABLE `stb_sxny_cn` (`dt` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', \
`val` DOUBLE ENCODE 'delta-d' COMPRESS 'tsz' LEVEL 'medium') TAGS (`point` VARCHAR(50), \
`point_name` VARCHAR(64), `point_path` VARCHAR(2000), `index_name` VARCHAR(64), \
`country_equipment_code` VARCHAR(64), `index_code` VARCHAR(64), `ps_code` VARCHAR(50), \
`cnstationno` VARCHAR(255), `index_level` VARCHAR(10), `cz_flag` VARCHAR(255), \
`blq_flag` VARCHAR(255), `dcc_flag` VARCHAR(255))",
"CREATE STABLE `stb_popo_power_station_all` (`ts` TIMESTAMP ENCODE 'delta-i' COMPRESS 'lz4' LEVEL 'medium', \
`assemble_capacity` DOUBLE ENCODE 'delta-d' COMPRESS 'lz4' LEVEL 'medium', `ps_status` DOUBLE ENCODE \
'delta-d' COMPRESS 'lz4' LEVEL 'medium') TAGS (`ps_type` VARCHAR(255), `ps_type_code` VARCHAR(255), \
`belorg_name` VARCHAR(255), `belorg_code` VARCHAR(255), `country_code` VARCHAR(255), `country_name` \
VARCHAR(255), `area_name` VARCHAR(255), `area_code` VARCHAR(255), `ps_name` VARCHAR(255), `ps_code` \
VARCHAR(255), `ps_aab` VARCHAR(255), `ps_type_sec_lvl` VARCHAR(255), `ps_type_sec_lvl_name` VARCHAR(255), \
`ps_type_name` VARCHAR(255), `longitude` DOUBLE, `latitude` DOUBLE, `is_china` VARCHAR(255), `is_access` \
VARCHAR(255), `first_put_production_date` VARCHAR(255), `all_put_production_date` VARCHAR(255), `merge_date` \
VARCHAR(255), `sold_date` VARCHAR(255), `cap_detail` VARCHAR(500), `ps_unit` VARCHAR(255), `region_name` \
VARCHAR(255))",
]
tdSql.executes(sqls)
ts = 1657146000000
# create subtable and insert data for super table stb_sxny_cn
for i in range(1, 1000):
sql = f"CREATE TABLE `stb_sxny_cn_{i}` USING `stb_sxny_cn` (point, point_name, point_path, index_name, country_equipment_code, \
index_code, ps_code, cnstationno, index_level, cz_flag, blq_flag, dcc_flag) TAGS('point{i}', 'point_name{i}', 'point_path{i}', 'index_name{i}', \
'country_equipment_code{i}', 'index_code{i}', 'ps_code{i%500}', 'cnstationno{i}', '{i}', 'cz_flag{i}', 'blq_flag{i}', 'dcc_flag{i}');"
tdSql.execute(sql)
sql = f"INSERT INTO `stb_sxny_cn_{i}` VALUES "
values = []
for j in range(1, 100):
values.append(f"({ts+(i%5)*86400000 + j}, {i%500 + j/20})")
sql += ", ".join(values)
tdSql.execute(sql)
tdLog.debug(f"create table stb_sxny_cn_{i} and insert data successfully")
# create subtable and insert data for super table stb_popo_power_station_all
for i in range(1, 1000):
sql = f"CREATE TABLE `stb_popo_power_station_all_{i}` USING `stb_popo_power_station_all` (ps_type, ps_type_code, belorg_name, belorg_code, \
country_code, country_name, area_name, area_code, ps_name, ps_code, ps_aab, ps_type_sec_lvl, ps_type_sec_lvl_name, ps_type_name, \
longitude, latitude, is_china, is_access, first_put_production_date, all_put_production_date, merge_date, sold_date, cap_detail, ps_unit, \
region_name) TAGS ('ps_type{i}', 'ps_type_code{i}', 'belorg_name{i}', 'belorg_code{i}', 'country_code{i}', 'country_name{i}', 'area_name{i}', \
'area_code{i}', 'ps_name{i}', 'ps_code{i}', 'ps_aab{i}', 'ps_type_sec_lvl{i}', 'ps_type_sec_lvl_name{i}', 'ps_type_name{i}', {i}, \
{i}, 'is_china{i}', 'is_access{i}', 'first_put_production_date{i}', 'all_put_production_date{i}', 'merge_date{i}', 'sold_date{i}', \
'cap_detail{i}', 'ps_unit{i}', 'region_name{i}');"
tdSql.execute(sql)
sql = f"INSERT INTO `stb_popo_power_station_all_{i}` VALUES "
values = []
for j in range(1, 6):
values.append(f"({ts+(j-1)*86400000}, {i*10 + j%10}, {j})")
sql += ", ".join(values)
tdSql.execute(sql)
tdLog.debug(f"create table stb_popo_power_station_all_{i} and insert data successfully")
for i in range(1, 499, 20):
pscode = f"ps_code{i}"
querySql = f"select t2.ts ,tt.ps_code,t2.ps_code from \
( select TIMETRUNCATE(t1.dt, 1d, 1) dt, t1.ps_code, first(dt) \
from ctg_tsdb.stb_sxny_cn t1 where ps_code<>'0' and dt >= '2022-07-07 00:00:00.000' \
and t1.ps_code='{pscode}' partition by point state_window(cast(val as int)) order by \
TIMETRUNCATE(t1.dt, 1d, 0) ) tt \
left join ctg_tsdb.stb_popo_power_station_all t2 \
on TIMETRUNCATE(tt.dt, 1d, 1)=TIMETRUNCATE(t2.ts, 1d, 1) \
and tt.ps_code = t2.ps_code "
tdSql.query(querySql)
tdSql.checkData(0, 1, pscode)
tdSql.checkData(0, 2, pscode)
tdLog.debug(f"execute sql: {pscode}")
querySql = f"select t2.ts ,tt.ps_code,t2.ps_code from ( select last(t1.dt) dt, t1.ps_code, first(dt) \
from ctg_tsdb.stb_sxny_cn t1 where ps_code<>'0' and dt >= '2022-07-07 00:00:00.000' and \
t1.ps_code='{pscode}' group by tbname order by dt) tt left join \
ctg_tsdb.stb_popo_power_station_all t2 on TIMETRUNCATE(tt.dt, 1d, 1)=TIMETRUNCATE(t2.ts, 1d, 1) \
and tt.ps_code = t2.ps_code"
tdSql.query(querySql)
tdSql.checkData(0, 1, pscode)
tdSql.checkData(0, 2, pscode)
tdLog.debug(f"execute sql: {pscode}")
querySql = f"select t2.ts ,tt.ps_code,t2.ps_code from ( select last(t1.dt) dt, last(ps_code) ps_code \
from ctg_tsdb.stb_sxny_cn t1 where ps_code<>'0' and dt >= '2022-07-07 00:00:00.000' and \
t1.ps_code='{pscode}' order by dt) tt left join ctg_tsdb.stb_popo_power_station_all t2 on \
TIMETRUNCATE(tt.dt, 1d, 1)=TIMETRUNCATE(t2.ts, 1d, 1) and tt.ps_code = t2.ps_code"
tdSql.query(querySql)
tdSql.checkData(0, 1, pscode)
tdSql.checkData(0, 2, pscode)
tdLog.debug(f"execute sql: {pscode}")
querySql = f"select t2.ts ,tt.ps_code,t2.ps_code from ( select _wstart dt, t1.ps_code, first(dt) \
from ctg_tsdb.stb_sxny_cn t1 where ps_code<>'0' and dt >= '2022-07-07 00:00:00.000' and \
t1.ps_code='{pscode}' interval(1m) order by dt) tt left join ctg_tsdb.stb_popo_power_station_all t2 \
on TIMETRUNCATE(tt.dt, 1d, 1)=TIMETRUNCATE(t2.ts, 1d, 1) and tt.ps_code = t2.ps_code"
tdSql.query(querySql)
tdSql.checkData(0, 1, pscode)
tdSql.checkData(0, 2, pscode)
tdLog.debug(f"execute sql: {pscode}")
querySql = f"select t2.ts ,tt.ps_code,t2.ps_code from (select first(dt) dt, t1.ps_code from \
ctg_tsdb.stb_sxny_cn t1 where ps_code<>'0' and dt >= '2022-07-07 00:00:00.000' and t1.ps_code='{pscode}' \
session(dt, 1m) order by dt) tt left join ctg_tsdb.stb_popo_power_station_all t2 on \
TIMETRUNCATE(tt.dt, 1d, 1)=TIMETRUNCATE(t2.ts, 1d, 1) and tt.ps_code = t2.ps_code"
tdSql.query(querySql)
tdSql.checkData(0, 1, pscode)
tdSql.checkData(0, 2, pscode)
tdLog.debug(f"execute sql: {pscode}")
def FIX_TS_5984(self):
tdLog.info("check bug TS_5984 ...\n")
# prepare data
@ -213,6 +329,8 @@ class TDTestCase(TBase):
def run(self):
tdLog.debug(f"start to excute {__file__}")
self.ts5946()
# TD BUGS
self.FIX_TD_30686()
self.FIX_TD_31684()