From e1e5a7780cc500ed18c8ee696ce97fd6c954634d Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 5 Jul 2024 10:49:44 +0800 Subject: [PATCH] revise fill start and end timestamp --- source/libs/executor/src/filloperator.c | 36 +++++++++++++- tests/system-test/2-query/fill.py | 62 +++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4b71c5ee3f..1f03f27a0f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -59,6 +59,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i static void destroyFillOperatorInfo(void* param); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void fillResetPrevForNewGroup(SFillInfo* pFillInfo); +static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order); static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, int32_t order) { @@ -74,7 +75,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp blockDataCleanup(pInfo->pRes); doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); - revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); + //revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); + reviseFillStartAndEndKey(pOperator->info, order); int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey : pInfo->existNewGroupBlock->info.window.skey; @@ -258,7 +260,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) { if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) { - revisedFillStartKey(pInfo, pBlock, order); + //revisedFillStartKey(pInfo, pBlock, order); + reviseFillStartAndEndKey(pInfo, order); } pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block @@ -549,3 +552,32 @@ _error: taosMemoryFreeClear(pOperator); return code; } + +static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) { + int64_t skey, ekey, next; + if (order == TSDB_ORDER_ASC) { + skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + + ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); + next = ekey; + while (next < pInfo->win.ekey) { + next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, + pInfo->pFillInfo->interval.precision); + ekey = next > pInfo->win.ekey ? ekey : next; + } + pInfo->win.ekey = ekey; + } else { + assert(order == TSDB_ORDER_DESC); + skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + + next = skey; + while (next < pInfo->win.skey) { + next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit, + pInfo->pFillInfo->interval.precision); + skey = next > pInfo->win.skey ? skey : next; + } + pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval); + } +} diff --git a/tests/system-test/2-query/fill.py b/tests/system-test/2-query/fill.py index f5cd2d5855..274e0710bd 100644 --- a/tests/system-test/2-query/fill.py +++ b/tests/system-test/2-query/fill.py @@ -1,3 +1,5 @@ +import random +from pandas._libs import interval import taos import sys @@ -15,7 +17,67 @@ class TDTestCase: #tdSql.init(conn.cursor()) tdSql.init(conn.cursor(), logSql) # output sql.txt file + def generate_fill_range(self, data_start: int, data_end: int, interval: int, step: int)-> list: + ret = [] + begin = data_start - 10 * interval + end = data_end + 10 * interval + for i in range(begin, end, step): + for j in range(begin, end, step): + ret.append((i,j)) + return ret + + def check_fill_range(self, where_start, where_end, res, sql: str): + if len(res) == 0: + tdLog.debug(f'fill sql got no rows {sql}') + return + if len(res) == 1: + tdLog.debug(f'fill sql got one row {sql}: {res}') + else: + first = res[0] + last = res[-1] + tdLog.debug(f'fill sql got rows {sql}: {res}') + + def generate_partition_by(self): + val = random.random() + if val < 0.6: + return "" + elif val < 0.8: + return "partition by location" + else: + return "partition by tbname" + + def generate_fill_interval(self)-> list[tuple]: + ret = [] + intervals = [1, 30, 60, 90, 120, 300, 3600] + for i in range(0, len(intervals)): + for j in range(0, i+1): + ret.append((intervals[i], intervals[j])) + return ret + + def generate_fill_sql(self, where_start, where_end, fill_interval: tuple) -> str: + partition_by = self.generate_partition_by() + where = f'ts >= {where_start} and ts < {where_end}' + return f'select _wstart, _wend, count(*) from meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)' + + def test_fill_range(self): + os.system('taosBenchmark -t 1000 -n 1000 -v 2 -S 32000 -y') + data_start = 1500000000 + data_end = 1500031968 + step = 100 + + fill_intervals: list[tuple] = self.generate_fill_interval() + fill_interval = random.choice(fill_intervals) + ranges = self.generate_fill_range(data_start, data_end, fill_interval[0], step) + range = random.choice(ranges) + sql = self.generate_fill_sql(range[0], range[1], fill_interval) + tdSql.query(sql) + res = tdSql.queryResult + self.check_fill_range(range[0], range[1], res, sql) + + ## tdSql.execute('drop database test') + def run(self): + self.test_fill_range() dbname = "db" tbname = "tb"