Merge pull request #3632 from taosdata/bugfix/td-1262
[td-1262]<fix>: stream support history data (#3275)
This commit is contained in:
commit
9ec42d055c
|
@ -136,7 +136,6 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
etime = pStream->stime + (etime - pStream->stime) / pStream->interval.interval * pStream->interval.interval;
|
||||
} else {
|
||||
etime = taosTimeTruncate(etime, &pStream->interval, pStream->precision);
|
||||
//etime = taosGetIntervalStartTimestamp(etime, pStream->interval.sliding, pStream->interval.sliding, pStream->interval.slidingUnit, pStream->precision);
|
||||
}
|
||||
pQueryInfo->window.ekey = etime;
|
||||
if (pQueryInfo->window.skey >= pQueryInfo->window.ekey) {
|
||||
|
@ -454,17 +453,11 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
|||
}
|
||||
} else { // timewindow based aggregation stream
|
||||
if (stime == 0) { // no data in meter till now
|
||||
stime = pQueryInfo->window.skey;
|
||||
if (stime == INT64_MIN) {
|
||||
stime = (int64_t)taosGetTimestamp(pStream->precision);
|
||||
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
|
||||
stime = taosTimeTruncate(stime - 1, &pStream->interval, pStream->precision);
|
||||
//stime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
|
||||
//stime = taosGetIntervalStartTimestamp(stime - 1, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
|
||||
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
|
||||
if (pQueryInfo->window.skey != INT64_MIN) {
|
||||
stime = pQueryInfo->window.skey;
|
||||
}
|
||||
stime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
|
||||
} else {
|
||||
//int64_t newStime = taosGetIntervalStartTimestamp(stime, pStream->interval.interval, pStream->interval.interval, pStream->interval.intervalUnit, pStream->precision);
|
||||
int64_t newStime = taosTimeTruncate(stime, &pStream->interval, pStream->precision);
|
||||
if (newStime != stime) {
|
||||
tscWarn("%p stream:%p, last timestamp:%" PRId64 ", reset to:%" PRId64, pSql, pStream, stime, newStime);
|
||||
|
@ -477,8 +470,10 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
|||
}
|
||||
|
||||
static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
|
||||
int64_t timer = pStream->stime - taosGetTimestamp(pStream->precision);
|
||||
if (timer < 0) timer = 0;
|
||||
int64_t timer = 0, now = taosGetTimestamp(pStream->precision);
|
||||
if (pStream->stime > now) {
|
||||
timer = pStream->stime - now;
|
||||
}
|
||||
|
||||
int64_t startDelay =
|
||||
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsStreamCompStartDelay * 1000L : tsStreamCompStartDelay;
|
||||
|
|
|
@ -155,6 +155,7 @@ python3 ./test.py -f stream/new.py
|
|||
python3 ./test.py -f stream/stream1.py
|
||||
python3 ./test.py -f stream/stream2.py
|
||||
python3 ./test.py -f stream/parser.py
|
||||
python3 ./test.py -f stream/history.py
|
||||
|
||||
#alter table
|
||||
python3 ./test.py -f alter/alter_table_crash.py
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import time
|
||||
import taos
|
||||
from util.log import tdLog
|
||||
from util.cases import tdCases
|
||||
from util.sql import tdSql
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
tdSql.execute("create table cars(ts timestamp, s int) tags(id int)")
|
||||
tdSql.execute("create table car0 using cars tags(0)")
|
||||
tdSql.execute("create table car1 using cars tags(1)")
|
||||
tdSql.execute("create table car2 using cars tags(2)")
|
||||
tdSql.execute("create table car3 using cars tags(3)")
|
||||
tdSql.execute("create table car4 using cars tags(4)")
|
||||
|
||||
tdSql.execute("insert into car0 values('2019-01-01 00:00:00.103', 1)")
|
||||
tdSql.execute("insert into car1 values('2019-01-01 00:00:00.234', 1)")
|
||||
tdSql.execute("insert into car0 values('2019-01-01 00:00:01.012', 1)")
|
||||
tdSql.execute("insert into car0 values('2019-01-01 00:00:02.003', 1)")
|
||||
tdSql.execute("insert into car2 values('2019-01-01 00:00:02.328', 1)")
|
||||
tdSql.execute("insert into car0 values('2019-01-01 00:00:03.139', 1)")
|
||||
tdSql.execute("insert into car0 values('2019-01-01 00:00:04.348', 1)")
|
||||
tdSql.execute("insert into car0 values('2019-01-01 00:00:05.783', 1)")
|
||||
tdSql.execute("insert into car1 values('2019-01-01 00:00:01.893', 1)")
|
||||
tdSql.execute("insert into car1 values('2019-01-01 00:00:02.712', 1)")
|
||||
tdSql.execute("insert into car1 values('2019-01-01 00:00:03.982', 1)")
|
||||
tdSql.execute("insert into car3 values('2019-01-01 00:00:01.389', 1)")
|
||||
tdSql.execute("insert into car4 values('2019-01-01 00:00:01.829', 1)")
|
||||
|
||||
tdSql.execute("create table strm as select count(*) from cars interval(4s)")
|
||||
tdSql.waitedQuery("select * from strm", 2, 100)
|
||||
tdSql.checkData(0, 1, 11)
|
||||
tdSql.checkData(1, 1, 2)
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue