Merge pull request #5853 from taosdata/hotfix/TD-3805
[TD-3805]CQ time issue
This commit is contained in:
commit
aa65f33cbe
|
@ -273,7 +273,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
||||||
pSdesc->num = htobe64(pStream->num);
|
pSdesc->num = htobe64(pStream->num);
|
||||||
|
|
||||||
pSdesc->useconds = htobe64(pStream->useconds);
|
pSdesc->useconds = htobe64(pStream->useconds);
|
||||||
pSdesc->stime = htobe64(pStream->stime - pStream->interval.interval);
|
pSdesc->stime = (pStream->stime == INT64_MIN) ? htobe64(pStream->stime) : htobe64(pStream->stime - pStream->interval.interval);
|
||||||
pSdesc->ctime = htobe64(pStream->ctime);
|
pSdesc->ctime = htobe64(pStream->ctime);
|
||||||
|
|
||||||
pSdesc->slidingTime = htobe64(pStream->interval.sliding);
|
pSdesc->slidingTime = htobe64(pStream->interval.sliding);
|
||||||
|
|
|
@ -402,10 +402,12 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) {
|
||||||
taos_close_stream(pStream);
|
taos_close_stream(pStream);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
timer = pStream->stime - taosGetTimestamp(pStream->precision);
|
if (pStream->stime > 0) {
|
||||||
if (timer < 0) {
|
timer = pStream->stime - taosGetTimestamp(pStream->precision);
|
||||||
timer = 0;
|
if (timer < 0) {
|
||||||
|
timer = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -473,6 +475,10 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
|
||||||
|
|
||||||
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
|
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
|
|
||||||
|
if (stime == INT64_MIN) {
|
||||||
|
return stime;
|
||||||
|
}
|
||||||
|
|
||||||
if (pStream->isProject) {
|
if (pStream->isProject) {
|
||||||
// no data in table, flush all data till now to destination meter, 10sec delay
|
// no data in table, flush all data till now to destination meter, 10sec delay
|
||||||
|
|
|
@ -449,7 +449,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||||
pObj->tmrId = 0;
|
pObj->tmrId = 0;
|
||||||
|
|
||||||
if (pObj->pStream == NULL) {
|
if (pObj->pStream == NULL) {
|
||||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL);
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
|
||||||
|
|
||||||
// TODO the pObj->pStream may be released if error happens
|
// TODO the pObj->pStream may be released if error happens
|
||||||
if (pObj->pStream) {
|
if (pObj->pStream) {
|
||||||
|
|
|
@ -42,7 +42,7 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.info("=============== step3")
|
tdLog.info("=============== step3")
|
||||||
start = time.time()
|
start = time.time()
|
||||||
tdSql.waitedQuery("select * from st", 1, 120)
|
tdSql.waitedQuery("select * from st", 1, 180)
|
||||||
delay = int(time.time() - start) + 80
|
delay = int(time.time() - start) + 80
|
||||||
v = tdSql.getData(0, 3)
|
v = tdSql.getData(0, 3)
|
||||||
if v >= 51:
|
if v >= 51:
|
||||||
|
|
|
@ -88,6 +88,8 @@ class TDTestCase:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(repr(e))
|
tdLog.info(repr(e))
|
||||||
|
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
tdSql.query("show streams")
|
tdSql.query("show streams")
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
tdSql.checkData(0, 2, 's0')
|
tdSql.checkData(0, 2, 's0')
|
||||||
|
@ -146,6 +148,7 @@ class TDTestCase:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
tdLog.info(repr(e))
|
tdLog.info(repr(e))
|
||||||
|
|
||||||
|
time.sleep(5)
|
||||||
tdSql.query("show streams")
|
tdSql.query("show streams")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 2, 's1')
|
tdSql.checkData(0, 2, 's1')
|
||||||
|
|
|
@ -47,7 +47,7 @@ class TDTestCase:
|
||||||
"select * from iostrm",
|
"select * from iostrm",
|
||||||
]
|
]
|
||||||
for sql in sqls:
|
for sql in sqls:
|
||||||
(rows, _) = tdSql.waitedQuery(sql, 1, 120)
|
(rows, _) = tdSql.waitedQuery(sql, 1, 240)
|
||||||
if rows < 1:
|
if rows < 1:
|
||||||
tdLog.exit("failed: sql:%s, expect at least one row" % sql)
|
tdLog.exit("failed: sql:%s, expect at least one row" % sql)
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,7 @@ class TDSql:
|
||||||
self.queryResult = self.cursor.fetchall()
|
self.queryResult = self.cursor.fetchall()
|
||||||
self.queryRows = len(self.queryResult)
|
self.queryRows = len(self.queryResult)
|
||||||
self.queryCols = len(self.cursor.description)
|
self.queryCols = len(self.cursor.description)
|
||||||
|
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
|
||||||
if self.queryRows >= expectRows:
|
if self.queryRows >= expectRows:
|
||||||
return (self.queryRows, i)
|
return (self.queryRows, i)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
sleep 2000
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print ======================== dnode1 start
|
||||||
|
|
||||||
|
$dbPrefix = s3_db
|
||||||
|
$tbPrefix = s3_tb
|
||||||
|
$mtPrefix = s3_mt
|
||||||
|
$stPrefix = s3_st
|
||||||
|
$tbNum = 10
|
||||||
|
$rowNum = 20
|
||||||
|
$totalNum = 200
|
||||||
|
|
||||||
|
print =============== step1
|
||||||
|
$i = 0
|
||||||
|
$db = $dbPrefix . $i
|
||||||
|
$mt = $mtPrefix . $i
|
||||||
|
$st = $stPrefix . $i
|
||||||
|
|
||||||
|
sql drop databae $db -x step1
|
||||||
|
step1:
|
||||||
|
sql create database $db keep 36500
|
||||||
|
sql use $db
|
||||||
|
sql create stable $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)
|
||||||
|
|
||||||
|
sql create table cq1 as select count(*) from $mt interval(10s);
|
||||||
|
|
||||||
|
sleep 1000
|
||||||
|
|
||||||
|
sql create table $st using $mt tags(1);
|
||||||
|
|
||||||
|
sql insert into $st values (-50000, 1, 1.0);
|
||||||
|
sql insert into $st values (-10000, 1, 1.0);
|
||||||
|
sql insert into $st values (10000, 1, 1.0);
|
||||||
|
|
||||||
|
|
||||||
|
$i = 0
|
||||||
|
while $i < 12
|
||||||
|
sql select * from cq1;
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
sleep 10000
|
||||||
|
else
|
||||||
|
if $data00 != @70-01-01 07:59:10.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data01 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data10 != @70-01-01 07:59:50.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data11 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data20 != @70-01-01 08:00:10.000@ then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data21 != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
$i = 12
|
||||||
|
endi
|
||||||
|
|
||||||
|
$i = $i + 1
|
||||||
|
endw
|
||||||
|
|
Loading…
Reference in New Issue