From e1e5a7780cc500ed18c8ee696ce97fd6c954634d Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 5 Jul 2024 10:49:44 +0800 Subject: [PATCH 1/3] revise fill start and end timestamp --- source/libs/executor/src/filloperator.c | 36 +++++++++++++- tests/system-test/2-query/fill.py | 62 +++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4b71c5ee3f..1f03f27a0f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -59,6 +59,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i static void destroyFillOperatorInfo(void* param); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void fillResetPrevForNewGroup(SFillInfo* pFillInfo); +static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order); static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, int32_t order) { @@ -74,7 +75,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp blockDataCleanup(pInfo->pRes); doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); - revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); + //revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); + reviseFillStartAndEndKey(pOperator->info, order); int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey : pInfo->existNewGroupBlock->info.window.skey; @@ -258,7 +260,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) { if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) { - revisedFillStartKey(pInfo, pBlock, order); + //revisedFillStartKey(pInfo, pBlock, order); + reviseFillStartAndEndKey(pInfo, order); } pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block @@ -549,3 +552,32 @@ _error: taosMemoryFreeClear(pOperator); return code; } + +static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) { + int64_t skey, ekey, next; + if (order == TSDB_ORDER_ASC) { + skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + + ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); + next = ekey; + while (next < pInfo->win.ekey) { + next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, + pInfo->pFillInfo->interval.precision); + ekey = next > pInfo->win.ekey ? ekey : next; + } + pInfo->win.ekey = ekey; + } else { + assert(order == TSDB_ORDER_DESC); + skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + + next = skey; + while (next < pInfo->win.skey) { + next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, + pInfo->pFillInfo->interval.precision); + skey = next > pInfo->win.skey ? skey : next; + } + pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); + } +} diff --git a/tests/system-test/2-query/fill.py b/tests/system-test/2-query/fill.py index f5cd2d5855..274e0710bd 100644 --- a/tests/system-test/2-query/fill.py +++ b/tests/system-test/2-query/fill.py @@ -1,3 +1,5 @@ +import random +from pandas._libs import interval import taos import sys @@ -15,7 +17,67 @@ class TDTestCase: #tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), logSql) # output sql.txt file + def generate_fill_range(self, data_start: int, data_end: int, interval: int, step: int)-> list: + ret = [] + begin = data_start - 10 * interval + end = data_end + 10 * interval + for i in range(begin, end, step): + for j in range(begin, end, step): + ret.append((i,j)) + return ret + + def check_fill_range(self, where_start, where_end, res, sql: str): + if len(res) == 0: + tdLog.debug(f'fill sql got no rows {sql}') + return + if len(res) == 1: + tdLog.debug(f'fill sql got one row {sql}: {res}') + else: + first = res[0] + last = res[-1] + tdLog.debug(f'fill sql got rows {sql}: {res}') + + def generate_partition_by(self): + val = random.random() + if val < 0.6: + return "" + elif val < 0.8: + return "partition by location" + else: + return "partition by tbname" + + def generate_fill_interval(self)-> list[tuple]: + ret = [] + intervals = [1, 30, 60, 90, 120, 300, 3600] + for i in range(0, len(intervals)): + for j in range(0, i+1): + ret.append((intervals[i], intervals[j])) + return ret + + def generate_fill_sql(self, where_start, where_end, fill_interval: tuple) -> str: + partition_by = self.generate_partition_by() + where = f'ts >= {where_start} and ts < {where_end}' + return f'select _wstart, _wend, count(*) from meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' + + def test_fill_range(self): + os.system('taosBenchmark -t 1000 -n 1000 -v 2 -S 32000 -y') + data_start = 1500000000 + data_end = 1500031968 + step = 100 + + fill_intervals: list[tuple] = self.generate_fill_interval() + fill_interval = random.choice(fill_intervals) + ranges = self.generate_fill_range(data_start, data_end, fill_interval[0], step) + range = random.choice(ranges) + sql = self.generate_fill_sql(range[0], range[1], fill_interval) + tdSql.query(sql) + res = tdSql.queryResult + self.check_fill_range(range[0], range[1], res, sql) + + ## tdSql.execute('drop database test') + def run(self): + self.test_fill_range() dbname = "db" tbname = "tb" From 0e868735beb99b8f3d9c40bf47b0a4934895a701 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 10 Jul 2024 19:10:17 +0800 Subject: [PATCH 2/3] fill add test --- tests/system-test/2-query/fill.py | 74 +++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/tests/system-test/2-query/fill.py b/tests/system-test/2-query/fill.py index 274e0710bd..8ea4622830 100644 --- a/tests/system-test/2-query/fill.py +++ b/tests/system-test/2-query/fill.py @@ -1,8 +1,10 @@ import random +from fabric2.runners import threading from pandas._libs import interval import taos import sys +from util.common import TDCom from util.log import * from util.sql import * from util.cases import * @@ -26,16 +28,20 @@ class TDTestCase: ret.append((i,j)) return ret - def check_fill_range(self, where_start, where_end, res, sql: str): - if len(res) == 0: - tdLog.debug(f'fill sql got no rows {sql}') + def check_fill_range(self, where_start, where_end, res_asc, res_desc, sql: str, interval): + if len(res_asc) != len(res_desc): + tdLog.exit(f"err, asc desc with different rows, asc: {len(res_asc)}, desc: {len(res_desc)} sql: {sql}") + if len(res_asc) == 0: + tdLog.info(f'from {where_start} to {where_end} no rows returned') return - if len(res) == 1: - tdLog.debug(f'fill sql got one row {sql}: {res}') + asc_first = res_asc[0] + asc_last = res_asc[-1] + desc_first = res_desc[0] + desc_last = res_desc[-1] + if asc_first[0] != desc_last[0] or asc_last[0] != desc_first[0]: + tdLog.exit(f'fill sql different row data {sql}: asc<{asc_first[0].timestamp()}, {asc_last[0].timestamp()}>, desc<{desc_last[0].timestamp()}, {desc_first[0].timestamp()}>') else: - first = res[0] - last = res[-1] - tdLog.debug(f'fill sql got rows {sql}: {res}') + tdLog.info(f'from {where_start} to {where_end} same time returned asc<{asc_first[0].timestamp()}, {asc_last[0].timestamp()}>, desc<{desc_last[0].timestamp()}, {desc_first[0].timestamp()}> interval: {interval}') def generate_partition_by(self): val = random.random() @@ -48,36 +54,58 @@ class TDTestCase: def generate_fill_interval(self)-> list[tuple]: ret = [] - intervals = [1, 30, 60, 90, 120, 300, 3600] + intervals = [60, 90, 120, 300, 3600] for i in range(0, len(intervals)): for j in range(0, i+1): ret.append((intervals[i], intervals[j])) return ret - def generate_fill_sql(self, where_start, where_end, fill_interval: tuple) -> str: + def generate_fill_sql(self, where_start, where_end, fill_interval: tuple): partition_by = self.generate_partition_by() - where = f'ts >= {where_start} and ts < {where_end}' - return f'select _wstart, _wend, count(*) from meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' + where = f'where ts >= {where_start} and ts < {where_end}' + sql = f'select _wstart, _wend, count(*) from test.meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' + sql_asc = sql + " order by _wstart asc" + sql_desc = sql + " order by _wstart desc" + return sql_asc, sql_desc + + def fill_test_thread_routine(self, cli: TDSql, interval, data_start, data_end, step): + ranges = self.generate_fill_range(data_start, data_end, interval[0], step) + for range in ranges: + sql_asc, sql_desc = self.generate_fill_sql(range[0], range[1], interval) + cli.query(sql_asc, queryTimes=1) + asc_res = cli.queryResult + cli.query(sql_desc, queryTimes=1) + desc_res = cli.queryResult + self.check_fill_range(range[0], range[1], asc_res,desc_res , sql_asc, interval) def test_fill_range(self): - os.system('taosBenchmark -t 1000 -n 1000 -v 2 -S 32000 -y') - data_start = 1500000000 - data_end = 1500031968 - step = 100 + os.system('taosBenchmark -t 10 -n 10000 -v 8 -S 32000 -y') + data_start = 1500000000000 + data_end = 1500319968000 + step = 2000000 + tdCom = TDCom() + inses: list[TDSql] = [] + threads: list[threading.Thread] = [] fill_intervals: list[tuple] = self.generate_fill_interval() - fill_interval = random.choice(fill_intervals) - ranges = self.generate_fill_range(data_start, data_end, fill_interval[0], step) - range = random.choice(ranges) - sql = self.generate_fill_sql(range[0], range[1], fill_interval) - tdSql.query(sql) - res = tdSql.queryResult - self.check_fill_range(range[0], range[1], res, sql) + for interval in fill_intervals: + ins = tdCom.newTdSql() + t = threading.Thread(target=self.fill_test_thread_routine, args=(ins, interval, data_start, data_end, step)) + t.start() + inses.append(ins) + threads.append(t) + + for t in threads: + t.join() + + for ins in inses: + ins.close() ## tdSql.execute('drop database test') def run(self): self.test_fill_range() + return dbname = "db" tbname = "tb" From 2c3cddb8b4f564202963091241c42ca1431157d0 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 11 Jul 2024 15:59:53 +0800 Subject: [PATCH 3/3] add tests for fix fill asc/desc --- docs/en/12-taos-sql/12-distinguished.md | 1 + docs/zh/12-taos-sql/12-distinguished.md | 1 + source/libs/executor/src/filloperator.c | 5 +- tests/system-test/2-query/fill.py | 68 +++++++++++++++-------- tests/system-test/2-query/test_td28163.py | 2 +- 5 files changed, 50 insertions(+), 27 deletions(-) diff --git a/docs/en/12-taos-sql/12-distinguished.md b/docs/en/12-taos-sql/12-distinguished.md index bfc9ca32c0..8eecb706c0 100644 --- a/docs/en/12-taos-sql/12-distinguished.md +++ b/docs/en/12-taos-sql/12-distinguished.md @@ -102,6 +102,7 @@ The detailed beaviors of `NULL`, `NULL_F`, `VALUE`, and VALUE_F are described be 1. A huge volume of interpolation output may be returned using `FILL`, so it's recommended to specify the time range when using `FILL`. The maximum number of interpolation values that can be returned in a single query is 10,000,000. 2. The result set is in ascending order of timestamp when you aggregate by time window. 3. If aggregate by window is used on STable, the aggregate function is performed on all the rows matching the filter conditions. If `PARTITION BY` is not used in the query, the result set will be returned in strict ascending order of timestamp; otherwise the result set will be returned in the order of ascending timestamp in each group. +4. The output windows of Fill are related with time range of WHERE Clause. For asc fill, the first output window is the first window that conains the start time of WHERE clause. The last output window is the last window that contains the end time of WHERE clause. ::: diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index 0eaeb0dfa7..50bf36d2e1 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -97,6 +97,7 @@ NULL, NULL_F, VALUE, VALUE_F 这几种填充模式针对不同场景区别如下 1. 使用 FILL 语句的时候可能生成大量的填充输出,务必指定查询的时间区间。针对每次查询,系统可返回不超过 1 千万条具有插值的结果。 2. 在时间维度聚合中,返回的结果中时间序列严格单调递增。 3. 如果查询对象是超级表,则聚合函数会作用于该超级表下满足值过滤条件的所有表的数据。如果查询中没有使用 PARTITION BY 语句,则返回的结果按照时间序列严格单调递增;如果查询中使用了 PARTITION BY 语句分组,则返回结果中每个 PARTITION 内按照时间序列严格单调递增。 +4. Fill输出的起始和结束窗口与WHERE条件的时间范围有关, 如增序Fill时, 第一个窗口是包含WHERE条件开始时间的第一个窗口, 最后一个窗口是包含WHERE条件结束时间的最后一个窗口。 ::: diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 1f03f27a0f..c4ef74608a 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -75,7 +75,6 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp blockDataCleanup(pInfo->pRes); doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); - //revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); reviseFillStartAndEndKey(pOperator->info, order); int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey @@ -260,7 +259,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) { if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) { - //revisedFillStartKey(pInfo, pBlock, order); reviseFillStartAndEndKey(pInfo, order); } @@ -570,14 +568,13 @@ static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) { } else { assert(order == TSDB_ORDER_DESC); skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); - taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); - next = skey; while (next < pInfo->win.skey) { next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, pInfo->pFillInfo->interval.precision); skey = next > pInfo->win.skey ? skey : next; } + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); } } diff --git a/tests/system-test/2-query/fill.py b/tests/system-test/2-query/fill.py index 8ea4622830..64a43bd80a 100644 --- a/tests/system-test/2-query/fill.py +++ b/tests/system-test/2-query/fill.py @@ -1,3 +1,4 @@ +import queue import random from fabric2.runners import threading from pandas._libs import interval @@ -12,6 +13,7 @@ from util.cases import * class TDTestCase: + updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'numOfVnodeQueryThreads': 80} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -19,7 +21,7 @@ class TDTestCase: #tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), logSql) # output sql.txt file - def generate_fill_range(self, data_start: int, data_end: int, interval: int, step: int)-> list: + def generate_fill_range(self, data_start: int, data_end: int, interval: int, step: int): ret = [] begin = data_start - 10 * interval end = data_end + 10 * interval @@ -52,9 +54,10 @@ class TDTestCase: else: return "partition by tbname" - def generate_fill_interval(self)-> list[tuple]: + def generate_fill_interval(self): ret = [] - intervals = [60, 90, 120, 300, 3600] + #intervals = [60, 90, 120, 300, 3600] + intervals = [120, 300, 3600] for i in range(0, len(intervals)): for j in range(0, i+1): ret.append((intervals[i], intervals[j])) @@ -63,9 +66,9 @@ class TDTestCase: def generate_fill_sql(self, where_start, where_end, fill_interval: tuple): partition_by = self.generate_partition_by() where = f'where ts >= {where_start} and ts < {where_end}' - sql = f'select _wstart, _wend, count(*) from test.meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' - sql_asc = sql + " order by _wstart asc" - sql_desc = sql + " order by _wstart desc" + sql = f'select first(_wstart), last(_wstart) from (select _wstart, _wend, count(*) from test.meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' + sql_asc = sql + " order by _wstart asc) t" + sql_desc = sql + " order by _wstart desc) t" return sql_asc, sql_desc def fill_test_thread_routine(self, cli: TDSql, interval, data_start, data_end, step): @@ -78,34 +81,55 @@ class TDTestCase: desc_res = cli.queryResult self.check_fill_range(range[0], range[1], asc_res,desc_res , sql_asc, interval) - def test_fill_range(self): - os.system('taosBenchmark -t 10 -n 10000 -v 8 -S 32000 -y') + def fill_test_task_routine(self, tdCom: TDCom, queue: queue.Queue): + cli = tdCom.newTdSql() + while True: + m: list = queue.get() + if len(m) == 0: + break + interval = m[0] + range = m[1] + sql_asc, sql_desc = self.generate_fill_sql(range[0], range[1], interval) + cli.query(sql_asc, queryTimes=1) + asc_res = cli.queryResult + cli.query(sql_desc, queryTimes=1) + desc_res = cli.queryResult + self.check_fill_range(range[0], range[1], asc_res,desc_res , sql_asc, interval) + cli.close() + + def schedule_fill_test_tasks(self): + num: int = 20 + threads = [] + tdCom = TDCom() + q: queue.Queue = queue.Queue() + for _ in range(num): + t = threading.Thread(target=self.fill_test_task_routine, args=(tdCom, q)) + t.start() + threads.append(t) + data_start = 1500000000000 data_end = 1500319968000 - step = 2000000 - tdCom = TDCom() - inses: list[TDSql] = [] - threads: list[threading.Thread] = [] + step = 30000000 fill_intervals: list[tuple] = self.generate_fill_interval() for interval in fill_intervals: - ins = tdCom.newTdSql() - t = threading.Thread(target=self.fill_test_thread_routine, args=(ins, interval, data_start, data_end, step)) - t.start() - inses.append(ins) - threads.append(t) + ranges = self.generate_fill_range(data_start, data_end, interval[0], step) + for r in ranges: + q.put([interval, r]) + + for _ in range(num): + q.put([]) for t in threads: t.join() - for ins in inses: - ins.close() - - ## tdSql.execute('drop database test') + def test_fill_range(self): + os.system('taosBenchmark -t 10 -n 10000 -v 8 -S 32000 -y') + self.schedule_fill_test_tasks() + tdSql.execute('drop database test') def run(self): self.test_fill_range() - return dbname = "db" tbname = "tb" diff --git a/tests/system-test/2-query/test_td28163.py b/tests/system-test/2-query/test_td28163.py index a101549b66..005d78d075 100644 --- a/tests/system-test/2-query/test_td28163.py +++ b/tests/system-test/2-query/test_td28163.py @@ -176,7 +176,7 @@ class TDTestCase: def test_query_with_window(self): # time window tdSql.query("select sum(c_int_empty) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m) fill(value, 10);") - tdSql.checkRows(841) + tdSql.checkRows(845) tdSql.checkData(0, 0, 10) tdSql.query("select _wstart, _wend, sum(c_int) from st where ts > '2024-01-01 00:00:00.000' and ts <= '2024-01-01 14:00:00.000' interval(5m) sliding(1m);")