From d81d784a4ae4ca21813b79d4f82aa8393d2316e8 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 19 Apr 2021 10:04:57 +0800 Subject: [PATCH 1/6] fix bug --- src/client/src/tscProfile.c | 2 +- src/client/src/tscStream.c | 4 ++++ src/cq/src/cqMain.c | 2 +- src/kit/shell/src/shellEngine.c | 5 +++++ 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 3b0e1b5775..6492075bbd 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -273,7 +273,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pSdesc->num = htobe64(pStream->num); pSdesc->useconds = htobe64(pStream->useconds); - pSdesc->stime = htobe64(pStream->stime - pStream->interval.interval); + pSdesc->stime = (pStream->stime == INT64_MIN) ? htobe64(pStream->stime) : htobe64(pStream->stime - pStream->interval.interval); pSdesc->ctime = htobe64(pStream->ctime); pSdesc->slidingTime = htobe64(pStream->interval.sliding); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 7699e6f459..89fc1c2621 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -473,6 +473,10 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) { static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + if (stime == INT64_MIN) { + return stime; + } if (pStream->isProject) { // no data in table, flush all data till now to destination meter, 10sec delay diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index d4d202267c..854fbf2ec9 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -430,7 +430,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, (void *)pObj->rid, NULL); + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 0eb1248fad..899a0af615 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -391,6 +391,11 @@ int regex_match(const char *s, const char *reg, int cflags) { static char* formatTimestamp(char* buf, int64_t val, int precision) { + if (val == INT64_MIN) { + memset(buf, ' ', 23); + return buf; + } + if (args.is_raw_time) { sprintf(buf, "%" PRId64, val); return buf; From 091720cfa9258b7f117297dcebb07f9c169ab0c4 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 19 Apr 2021 13:44:09 +0800 Subject: [PATCH 2/6] fix bug --- src/kit/shell/src/shellEngine.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 899a0af615..0eb1248fad 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -391,11 +391,6 @@ int regex_match(const char *s, const char *reg, int cflags) { static char* formatTimestamp(char* buf, int64_t val, int precision) { - if (val == INT64_MIN) { - memset(buf, ' ', 23); - return buf; - } - if (args.is_raw_time) { sprintf(buf, "%" PRId64, val); return buf; From 3976e1d0726adf441cf9e7b5e92398e340e3e309 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Mon, 19 Apr 2021 14:59:39 +0800 Subject: [PATCH 3/6] add test case --- tests/script/general/stream/stream_1970.sim | 73 +++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 tests/script/general/stream/stream_1970.sim diff --git a/tests/script/general/stream/stream_1970.sim b/tests/script/general/stream/stream_1970.sim new file mode 100644 index 0000000000..30a733c08f --- /dev/null +++ b/tests/script/general/stream/stream_1970.sim @@ -0,0 +1,73 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 1 +system sh/exec.sh -n dnode1 -s start + +sleep 2000 +sql connect + +print ======================== dnode1 start + +$dbPrefix = s3_db +$tbPrefix = s3_tb +$mtPrefix = s3_mt +$stPrefix = s3_st +$tbNum = 10 +$rowNum = 20 +$totalNum = 200 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i +$st = $stPrefix . $i + +sql drop databae $db -x step1 +step1: +sql create database $db keep 36500 +sql use $db +sql create stable $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int) + +sql create table cq1 as select count(*) from $mt interval(10s); + +sleep 1000 + +sql create table $st using $mt tags(1); + +sql insert into $st values (-50000, 1, 1.0); +sql insert into $st values (-10000, 1, 1.0); +sql insert into $st values (10000, 1, 1.0); + + +$i = 0 +while $i < 12 + sql select * from cq1; + + if $rows != 3 then + sleep 10000 + else + if $data00 != @70-01-01 07:59:10.000@ then + return -1 + endi + if $data01 != 1 then + return -1 + endi + if $data10 != @70-01-01 07:59:50.000@ then + return -1 + endi + if $data11 != 1 then + return -1 + endi + if $data20 != @70-01-01 08:00:10.000@ then + return -1 + endi + if $data21 != 1 then + return -1 + endi + $i = 12 + endi + + $i = $i + 1 +endw + From acee0af58f7d307421ba885513fcac5bf9852e35 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Tue, 20 Apr 2021 11:24:42 +0800 Subject: [PATCH 4/6] fix case issue --- tests/pytest/stream/stream2.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/pytest/stream/stream2.py b/tests/pytest/stream/stream2.py index d71742048a..9b4eb8725c 100644 --- a/tests/pytest/stream/stream2.py +++ b/tests/pytest/stream/stream2.py @@ -88,6 +88,8 @@ class TDTestCase: except Exception as e: tdLog.info(repr(e)) + + time.sleep(5) tdSql.query("show streams") tdSql.checkRows(1) tdSql.checkData(0, 2, 's0') @@ -146,6 +148,7 @@ class TDTestCase: except Exception as e: tdLog.info(repr(e)) + time.sleep(5) tdSql.query("show streams") tdSql.checkRows(2) tdSql.checkData(0, 2, 's1') From 23f23a445da65e9d95d9c5da237351a2cd4984ae Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Wed, 21 Apr 2021 19:36:46 +0800 Subject: [PATCH 5/6] modify case --- tests/pytest/stream/new.py | 2 +- tests/pytest/util/sql.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pytest/stream/new.py b/tests/pytest/stream/new.py index 70f300e937..4a0e47c01a 100644 --- a/tests/pytest/stream/new.py +++ b/tests/pytest/stream/new.py @@ -42,7 +42,7 @@ class TDTestCase: tdLog.info("=============== step3") start = time.time() - tdSql.waitedQuery("select * from st", 1, 120) + tdSql.waitedQuery("select * from st", 1, 180) delay = int(time.time() - start) + 80 v = tdSql.getData(0, 3) if v >= 51: diff --git a/tests/pytest/util/sql.py b/tests/pytest/util/sql.py index 16931cca33..9d1d3a5703 100644 --- a/tests/pytest/util/sql.py +++ b/tests/pytest/util/sql.py @@ -87,6 +87,7 @@ class TDSql: self.queryResult = self.cursor.fetchall() self.queryRows = len(self.queryResult) self.queryCols = len(self.cursor.description) + tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows)) if self.queryRows >= expectRows: return (self.queryRows, i) time.sleep(1) From 2bfc6a90a5ce889e26f946b02cd493a4b781276e Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Thu, 22 Apr 2021 09:50:44 +0800 Subject: [PATCH 6/6] fix bug --- src/client/src/tscStream.c | 10 ++++++---- tests/pytest/stream/sys.py | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 89fc1c2621..a598e5eec9 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -402,10 +402,12 @@ static void tscSetNextLaunchTimer(SSqlStream *pStream, SSqlObj *pSql) { taos_close_stream(pStream); return; } - - timer = pStream->stime - taosGetTimestamp(pStream->precision); - if (timer < 0) { - timer = 0; + + if (pStream->stime > 0) { + timer = pStream->stime - taosGetTimestamp(pStream->precision); + if (timer < 0) { + timer = 0; + } } } diff --git a/tests/pytest/stream/sys.py b/tests/pytest/stream/sys.py index a73e7043e8..c9a3fccfe6 100644 --- a/tests/pytest/stream/sys.py +++ b/tests/pytest/stream/sys.py @@ -47,7 +47,7 @@ class TDTestCase: "select * from iostrm", ] for sql in sqls: - (rows, _) = tdSql.waitedQuery(sql, 1, 120) + (rows, _) = tdSql.waitedQuery(sql, 1, 240) if rows < 1: tdLog.exit("failed: sql:%s, expect at least one row" % sql)