revise fill start and end timestamp
This commit is contained in:
parent
51d3422b4b
commit
e1e5a7780c
|
@ -59,6 +59,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, i
|
||||||
static void destroyFillOperatorInfo(void* param);
|
static void destroyFillOperatorInfo(void* param);
|
||||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
|
||||||
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo);
|
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo);
|
||||||
|
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
|
||||||
|
|
||||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||||
SResultInfo* pResultInfo, int32_t order) {
|
SResultInfo* pResultInfo, int32_t order) {
|
||||||
|
@ -74,7 +75,8 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
|
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
|
||||||
|
|
||||||
revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order);
|
//revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order);
|
||||||
|
reviseFillStartAndEndKey(pOperator->info, order);
|
||||||
|
|
||||||
int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
|
int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
|
||||||
: pInfo->existNewGroupBlock->info.window.skey;
|
: pInfo->existNewGroupBlock->info.window.skey;
|
||||||
|
@ -258,7 +260,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
|
if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
|
||||||
if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
|
if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
|
||||||
revisedFillStartKey(pInfo, pBlock, order);
|
//revisedFillStartKey(pInfo, pBlock, order);
|
||||||
|
reviseFillStartAndEndKey(pInfo, order);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block
|
pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block
|
||||||
|
@ -549,3 +552,32 @@ _error:
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
|
||||||
|
int64_t skey, ekey, next;
|
||||||
|
if (order == TSDB_ORDER_ASC) {
|
||||||
|
skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
|
||||||
|
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
|
||||||
|
|
||||||
|
ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
|
||||||
|
next = ekey;
|
||||||
|
while (next < pInfo->win.ekey) {
|
||||||
|
next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
|
||||||
|
pInfo->pFillInfo->interval.precision);
|
||||||
|
ekey = next > pInfo->win.ekey ? ekey : next;
|
||||||
|
}
|
||||||
|
pInfo->win.ekey = ekey;
|
||||||
|
} else {
|
||||||
|
assert(order == TSDB_ORDER_DESC);
|
||||||
|
skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
|
||||||
|
taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
|
||||||
|
|
||||||
|
next = skey;
|
||||||
|
while (next < pInfo->win.skey) {
|
||||||
|
next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
|
||||||
|
pInfo->pFillInfo->interval.precision);
|
||||||
|
skey = next > pInfo->win.skey ? skey : next;
|
||||||
|
}
|
||||||
|
pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import random
|
||||||
|
from pandas._libs import interval
|
||||||
import taos
|
import taos
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
@ -15,7 +17,67 @@ class TDTestCase:
|
||||||
#tdSql.init(conn.cursor())
|
#tdSql.init(conn.cursor())
|
||||||
tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
|
def generate_fill_range(self, data_start: int, data_end: int, interval: int, step: int)-> list:
|
||||||
|
ret = []
|
||||||
|
begin = data_start - 10 * interval
|
||||||
|
end = data_end + 10 * interval
|
||||||
|
for i in range(begin, end, step):
|
||||||
|
for j in range(begin, end, step):
|
||||||
|
ret.append((i,j))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def check_fill_range(self, where_start, where_end, res, sql: str):
|
||||||
|
if len(res) == 0:
|
||||||
|
tdLog.debug(f'fill sql got no rows {sql}')
|
||||||
|
return
|
||||||
|
if len(res) == 1:
|
||||||
|
tdLog.debug(f'fill sql got one row {sql}: {res}')
|
||||||
|
else:
|
||||||
|
first = res[0]
|
||||||
|
last = res[-1]
|
||||||
|
tdLog.debug(f'fill sql got rows {sql}: {res}')
|
||||||
|
|
||||||
|
def generate_partition_by(self):
|
||||||
|
val = random.random()
|
||||||
|
if val < 0.6:
|
||||||
|
return ""
|
||||||
|
elif val < 0.8:
|
||||||
|
return "partition by location"
|
||||||
|
else:
|
||||||
|
return "partition by tbname"
|
||||||
|
|
||||||
|
def generate_fill_interval(self)-> list[tuple]:
|
||||||
|
ret = []
|
||||||
|
intervals = [1, 30, 60, 90, 120, 300, 3600]
|
||||||
|
for i in range(0, len(intervals)):
|
||||||
|
for j in range(0, i+1):
|
||||||
|
ret.append((intervals[i], intervals[j]))
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def generate_fill_sql(self, where_start, where_end, fill_interval: tuple) -> str:
|
||||||
|
partition_by = self.generate_partition_by()
|
||||||
|
where = f'ts >= {where_start} and ts < {where_end}'
|
||||||
|
return f'select _wstart, _wend, count(*) from meters {where} {partition_by} interval({fill_interval[0]}s) sliding({fill_interval[1]}s) fill(NULL)'
|
||||||
|
|
||||||
|
def test_fill_range(self):
|
||||||
|
os.system('taosBenchmark -t 1000 -n 1000 -v 2 -S 32000 -y')
|
||||||
|
data_start = 1500000000
|
||||||
|
data_end = 1500031968
|
||||||
|
step = 100
|
||||||
|
|
||||||
|
fill_intervals: list[tuple] = self.generate_fill_interval()
|
||||||
|
fill_interval = random.choice(fill_intervals)
|
||||||
|
ranges = self.generate_fill_range(data_start, data_end, fill_interval[0], step)
|
||||||
|
range = random.choice(ranges)
|
||||||
|
sql = self.generate_fill_sql(range[0], range[1], fill_interval)
|
||||||
|
tdSql.query(sql)
|
||||||
|
res = tdSql.queryResult
|
||||||
|
self.check_fill_range(range[0], range[1], res, sql)
|
||||||
|
|
||||||
|
## tdSql.execute('drop database test')
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self.test_fill_range()
|
||||||
dbname = "db"
|
dbname = "db"
|
||||||
tbname = "tb"
|
tbname = "tb"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue