Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837

This commit is contained in:
54liuyao 2024-10-18 16:45:30 +08:00
commit bef7fb6045
11 changed files with 263 additions and 102 deletions

View File

@ -153,7 +153,7 @@ SELECT * from information_schema.`ins_streams`;
由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。 由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。
因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。 因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。MAX_DELAY最小时间是5s如果低于5s创建流计算时会报错。
MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 max delay 指定的时间,则立即触发计算 MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 max delay 指定的时间,则立即触发计算

View File

@ -10733,6 +10733,19 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
"Non window query only support scalar function, aggregate function is not allowed"); "Non window query only support scalar function, aggregate function is not allowed");
} }
if (NULL != pStmt->pOptions->pDelay) {
SValueNode* pVal = (SValueNode*)pStmt->pOptions->pDelay;
int64_t minDelay = 0;
char* str = "5s";
if (DEAL_RES_ERROR != translateValue(pCxt, pVal) && TSDB_CODE_SUCCESS ==
parseNatualDuration(str, strlen(str), &minDelay, &pVal->unit, pVal->node.resType.precision, false)) {
if (pVal->datum.i < minDelay) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"stream max delay must be bigger than 5 session");
}
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -500,6 +500,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
if (!pStr) { if (!pStr) {
if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) { if (onlyCache && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {
(*pWinCode) = TSDB_CODE_FAILED; (*pWinCode) = TSDB_CODE_FAILED;
goto _end;
} }
(*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal); (*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal);
if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) { if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) {

View File

@ -18,6 +18,7 @@ import time
import socket import socket
import json import json
import toml import toml
import subprocess
from frame.boundary import DataBoundary from frame.boundary import DataBoundary
import taos import taos
from frame.log import * from frame.log import *
@ -1830,6 +1831,51 @@ class TDCom:
if i == 1: if i == 1:
self.record_history_ts = ts_value self.record_history_ts = ts_value
def generate_query_result(self, inputfile, test_case):
if not os.path.exists(inputfile):
tdLog.exit(f"Input file '{inputfile}' does not exist.")
else:
self.query_result_file = f"./temp_{test_case}.result"
os.system(f"taos -f {inputfile} | grep -v 'Query OK'|grep -v 'Copyright'| grep -v 'Welcome to the TDengine Command' > {self.query_result_file} ")
return self.query_result_file
def compare_result_files(self, file1, file2):
try:
# use subprocess.run to execute diff/fc commands
# print(file1, file2)
if platform.system().lower() != 'windows':
cmd='diff'
result = subprocess.run([cmd, "-u", "--color", file1, file2], text=True, capture_output=True)
else:
cmd='fc'
result = subprocess.run([cmd, file1, file2], text=True, capture_output=True)
# if result is not empty, print the differences and files name. Otherwise, the files are identical.
if result.stdout:
tdLog.debug(f"Differences between {file1} and {file2}")
tdLog.notice(f"\r\n{result.stdout}")
return False
else:
return True
except FileNotFoundError:
tdLog.debug("The 'diff' command is not found. Please make sure it's installed and available in your PATH.")
except Exception as e:
tdLog.debug(f"An error occurred: {e}")
def compare_testcase_result(self, inputfile,expected_file,test_case):
test_reulst_file = self.generate_query_result(inputfile,test_case)
if self.compare_result_files(expected_file, test_reulst_file ):
tdLog.info("Test passed: Result files are identical.")
os.system(f"rm -f {test_reulst_file}")
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.exit(f"{caller.lineno}(line:{caller.lineno}) failed: sqlfile is {inputfile}, expect_file:{expected_file} != reult_file:{test_reulst_file} ")
tdLog.exit("Test failed: Result files are different.")
def is_json(msg): def is_json(msg):
if isinstance(msg, str): if isinstance(msg, str):
try: try:
@ -1864,4 +1910,6 @@ def dict2toml(in_dict: dict, file:str):
with open(file, 'w') as f: with open(file, 'w') as f:
toml.dump(in_dict, f) toml.dump(in_dict, f)
tdCom = TDCom() tdCom = TDCom()

View File

@ -0,0 +1,114 @@
taos> select pi()
pi() |
============================
3.141592653589793 |
taos> select pi() + 1
pi() + 1 |
============================
4.141592653589793 |
taos> select pi() - 1
pi() - 1 |
============================
2.141592653589793 |
taos> select pi() * 2
pi() * 2 |
============================
6.283185307179586 |
taos> select pi() / 2
pi() / 2 |
============================
1.570796326794897 |
taos> select pi() from ts_4893.meters limit 5
pi() |
============================
3.141592653589793 |
3.141592653589793 |
3.141592653589793 |
3.141592653589793 |
3.141592653589793 |
taos> select pi() + 1 from ts_4893.meters limit 1
pi() + 1 |
============================
4.141592653589793 |
taos> select pi() - 1 from ts_4893.meters limit 1
pi() - 1 |
============================
2.141592653589793 |
taos> select pi() * 2 from ts_4893.meters limit 1
pi() * 2 |
============================
6.283185307179586 |
taos> select pi() / 2 from ts_4893.meters limit 1
pi() / 2 |
============================
1.570796326794897 |
taos> select pi() + pi() from ts_4893.meters limit 1
pi() + pi() |
============================
6.283185307179586 |
taos> select pi() - pi() from ts_4893.meters limit 1
pi() - pi() |
============================
0.000000000000000 |
taos> select pi() * pi() from ts_4893.meters limit 1
pi() * pi() |
============================
9.869604401089358 |
taos> select pi() / pi() from ts_4893.meters limit 1
pi() / pi() |
============================
1.000000000000000 |
taos> select pi() + id from ts_4893.meters order by ts limit 5
pi() + id |
============================
3.141592653589793 |
4.141592653589793 |
5.141592653589793 |
6.141592653589793 |
7.141592653589793 |
taos> select abs(pi())
abs(pi()) |
============================
3.141592653589793 |
taos> select pow(pi(), 2)
pow(pi(), 2) |
============================
9.869604401089358 |
taos> select sqrt(pi())
sqrt(pi()) |
============================
1.772453850905516 |
taos> select cast(pi() as int)
cast(pi() as int) |
====================
3 |
taos> select pi()
pi() |
============================
3.141592653589793 |
taos> select substring_index(null, '.', 2)
substring_index(null, '.', 2) |
================================
NULL |
Can't render this file because it has a wrong number of fields in line 90.

View File

@ -1,20 +1,21 @@
select pi(); select pi()
select pi() + 1; select pi() + 1
select pi() - 1; select pi() - 1
select pi() * 2; select pi() * 2
select pi() / 2; select pi() / 2
select pi() from ts_4893.meters limit 5; select pi() from ts_4893.meters limit 5
select pi() + 1 from ts_4893.meters limit 1; select pi() + 1 from ts_4893.meters limit 1
select pi() - 1 from ts_4893.meters limit 1; select pi() - 1 from ts_4893.meters limit 1
select pi() * 2 from ts_4893.meters limit 1; select pi() * 2 from ts_4893.meters limit 1
select pi() / 2 from ts_4893.meters limit 1; select pi() / 2 from ts_4893.meters limit 1
select pi() + pi() from ts_4893.meters limit 1; select pi() + pi() from ts_4893.meters limit 1
select pi() - pi() from ts_4893.meters limit 1; select pi() - pi() from ts_4893.meters limit 1
select pi() * pi() from ts_4893.meters limit 1; select pi() * pi() from ts_4893.meters limit 1
select pi() / pi() from ts_4893.meters limit 1; select pi() / pi() from ts_4893.meters limit 1
select pi() + id from ts_4893.meters order by ts limit 5; select pi() + id from ts_4893.meters order by ts limit 5
select abs(pi()); select abs(pi())
select pow(pi(), 2); select pow(pi(), 2)
select sqrt(pi()); select sqrt(pi())
select cast(pi() as int); select cast(pi() as int)
select pi(); select pi()
select substring_index(null, '.', 2)

View File

@ -17,14 +17,15 @@ import random
import taos import taos
import frame import frame
import frame.etool
from frame.etool import *
from frame.log import * from frame.log import *
from frame.cases import * from frame.cases import *
from frame.sql import * from frame.sql import *
from frame.caseBase import * from frame.caseBase import *
from frame import *
from frame import etool
from frame.common import *
class TDTestCase(TBase): class TDTestCase(TBase):
updatecfgDict = { updatecfgDict = {
@ -84,8 +85,16 @@ class TDTestCase(TBase):
tdSql.error(err_statement) tdSql.error(err_statement)
err_statement = '' err_statement = ''
def test_normal_query_new(self, testCase):
# read sql from .sql file and execute
tdLog.info(f"test normal query.")
self.sqlFile = etool.curFile(__file__, f"in/{testCase}.in")
self.ansFile = etool.curFile(__file__, f"ans/{testCase}_1.csv")
tdCom.compare_testcase_result(self.sqlFile, self.ansFile, testCase)
def test_pi(self): def test_pi(self):
self.test_normal_query("pi") self.test_normal_query_new("pi")
def test_round(self): def test_round(self):
self.test_normal_query("round") self.test_normal_query("round")

View File

@ -133,4 +133,17 @@ if $data13 != -111 then
goto loop1 goto loop1
endi endi
print step 2====================
sql create database test vgroups 1 ;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql_error create stream streams1 trigger max_delay 4000a ignore update 0 ignore expired 0 into streamtST1 as select _wstart, count(*) from st interval(5s);
sql_error create stream streams2 trigger max_delay 4s ignore update 0 ignore expired 0 into streamtST2 as select _wstart, count(*) from st interval(5s);
sql create stream streams3 trigger max_delay 5000a ignore update 0 ignore expired 0 into streamtST3 as select _wstart, count(*) from st interval(5s);
sql create stream streams4 trigger max_delay 5s ignore update 0 ignore expired 0 into streamtST4 as select _wstart, count(*) from st interval(5s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -48,15 +48,15 @@ sql create table t1 using st tags(1);
sql create table t2 using st tags(2); sql create table t2 using st tags(2);
sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s); sql create stream stream2 trigger window_close into streamt2 as select _wstart, sum(a) from st interval(10s);
sql create stream stream3 trigger max_delay 1s into streamt3 as select _wstart, sum(a) from st interval(10s); sql create stream stream3 trigger max_delay 5s into streamt3 as select _wstart, sum(a) from st interval(10s);
sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s); sql create stream stream4 trigger window_close into streamt4 as select _wstart, sum(a) from t1 interval(10s);
sql create stream stream5 trigger max_delay 1s into streamt5 as select _wstart, sum(a) from t1 interval(10s); sql create stream stream5 trigger max_delay 5s into streamt5 as select _wstart, sum(a) from t1 interval(10s);
sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s); sql create stream stream6 trigger window_close into streamt6 as select _wstart, sum(a) from st session(ts, 10s);
sql create stream stream7 trigger max_delay 1s into streamt7 as select _wstart, sum(a) from st session(ts, 10s); sql create stream stream7 trigger max_delay 5s into streamt7 as select _wstart, sum(a) from st session(ts, 10s);
sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s); sql create stream stream8 trigger window_close into streamt8 as select _wstart, sum(a) from t1 session(ts, 10s);
sql create stream stream9 trigger max_delay 1s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s); sql create stream stream9 trigger max_delay 5s into streamt9 as select _wstart, sum(a) from t1 session(ts, 10s);
sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b); sql create stream stream10 trigger window_close into streamt10 as select _wstart, sum(a) from t1 state_window(b);
sql create stream stream11 trigger max_delay 1s into streamt11 as select _wstart, sum(a) from t1 state_window(b); sql create stream stream11 trigger max_delay 5s into streamt11 as select _wstart, sum(a) from t1 state_window(b);
run tsim/stream/checkTaskStatus.sim run tsim/stream/checkTaskStatus.sim
@ -138,12 +138,12 @@ if $rows != 2 then
goto loop1 goto loop1
endi endi
print step 1 max delay 2s print step 1 max delay 5s
sql create database test3 vgroups 4; sql create database test3 vgroups 4;
sql use test3; sql use test3;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream13 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 interval(10s); sql create stream stream13 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 interval(10s);
run tsim/stream/checkTaskStatus.sim run tsim/stream/checkTaskStatus.sim
@ -172,8 +172,8 @@ $now02 = $data02
$now12 = $data12 $now12 = $data12
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;
@ -188,7 +188,7 @@ if $data12 != $now12 then
return -1 return -1
endi endi
print step 2 max delay 2s print step 2 max delay 5s
sql create database test4 vgroups 4; sql create database test4 vgroups 4;
sql use test4; sql use test4;
@ -197,7 +197,7 @@ sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,t
sql create table t1 using st tags(1,1,1); sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2); sql create table t2 using st tags(2,2,2);
sql create stream stream14 trigger max_delay 2s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s); sql create stream stream14 trigger max_delay 5s into streamt14 as select _wstart, sum(a), now from st partition by tbname interval(10s);
run tsim/stream/checkTaskStatus.sim run tsim/stream/checkTaskStatus.sim
@ -234,8 +234,8 @@ $now12 = $data12
$now22 = $data22 $now22 = $data22
$now32 = $data32 $now32 = $data32
print step2 max delay 2s......... sleep 3s print step2 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt14 order by 2; sql select * from streamt14 order by 2;
print $data00 $data01 $data02 print $data00 $data01 $data02
@ -264,8 +264,8 @@ if $data32 != $now32 then
return -1 return -1
endi endi
print step2 max delay 2s......... sleep 3s print step2 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt14 order by 2; sql select * from streamt14 order by 2;
print $data00 $data01 $data02 print $data00 $data01 $data02
@ -294,12 +294,12 @@ if $data32 != $now32 then
return -1 return -1
endi endi
print step 2 max delay 2s print step 2 max delay 5s
sql create database test15 vgroups 4; sql create database test15 vgroups 4;
sql use test15; sql use test15;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream15 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s); sql create stream stream15 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 session(ts, 10s);
run tsim/stream/checkTaskStatus.sim run tsim/stream/checkTaskStatus.sim
@ -328,8 +328,8 @@ $now02 = $data02
$now12 = $data12 $now12 = $data12
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;
@ -344,8 +344,8 @@ if $data12 != $now12 then
return -1 return -1
endi endi
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;
@ -362,12 +362,12 @@ endi
print session max delay over print session max delay over
print step 3 max delay 2s print step 3 max delay 5s
sql create database test16 vgroups 4; sql create database test16 vgroups 4;
sql use test16; sql use test16;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream16 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 state_window(a); sql create stream stream16 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 state_window(a);
run tsim/stream/checkTaskStatus.sim run tsim/stream/checkTaskStatus.sim
@ -396,8 +396,8 @@ $now02 = $data02
$now12 = $data12 $now12 = $data12
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;
@ -412,8 +412,8 @@ if $data12 != $now12 then
return -1 return -1
endi endi
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;
@ -430,12 +430,12 @@ endi
print state max delay over print state max delay over
print step 4 max delay 2s print step 4 max delay 5s
sql create database test17 vgroups 4; sql create database test17 vgroups 4;
sql use test17; sql use test17;
sql create table t1(ts timestamp, a int, b int , c int, d double); sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream stream17 trigger max_delay 2s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9; sql create stream stream17 trigger max_delay 5s into streamt13 as select _wstart, sum(a), now from t1 event_window start with a = 1 end with a = 9;
run tsim/stream/checkTaskStatus.sim run tsim/stream/checkTaskStatus.sim
@ -467,8 +467,8 @@ $now02 = $data02
$now12 = $data12 $now12 = $data12
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;
@ -483,8 +483,8 @@ if $data12 != $now12 then
return -1 return -1
endi endi
print step1 max delay 2s......... sleep 3s print step1 max delay 5s......... sleep 6s
sleep 3000 sleep 6000
sql select * from streamt13; sql select * from streamt13;

View File

@ -24,45 +24,6 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor()) tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
# def consume_TS_4674_Test(self):
#
# os.system("nohup taosBenchmark -y -B 1 -t 4 -S 1000 -n 1000000 -i 1000 -v 1 -a 3 > /dev/null 2>&1 &")
# time.sleep()
# tdSql.execute(f'create topic topic_all with meta as database test')
# consumer_dict = {
# "group.id": "g1",
# "td.connect.user": "root",
# "td.connect.pass": "taosdata",
# "auto.offset.reset": "earliest",
# }
# consumer = Consumer(consumer_dict)
#
# try:
# consumer.subscribe(["topic_all"])
# except TmqError:
# tdLog.exit(f"subscribe error")
#
# try:
# while True:
# res = consumer.poll(5)
# if not res:
# print(f"null")
# continue
# val = res.value()
# if val is None:
# print(f"null")
# continue
# cnt = 0;
# for block in val:
# cnt += len(block.fetchall())
#
# print(f"block {cnt} rows")
#
# finally:
# consumer.close()
def get_leader(self): def get_leader(self):
tdLog.debug("get leader") tdLog.debug("get leader")
tdSql.query("show vnodes") tdSql.query("show vnodes")
@ -74,19 +35,20 @@ class TDTestCase:
def balance_vnode(self): def balance_vnode(self):
leader_before = self.get_leader() leader_before = self.get_leader()
tdSql.query("balance vgroup leader")
while True: while True:
leader_after = -1 leader_after = -1
tdSql.query("balance vgroup leader") tdLog.debug("balancing vgroup leader")
while True: while True:
tdLog.debug("get new vgroup leader")
leader_after = self.get_leader() leader_after = self.get_leader()
if leader_after != -1 : if leader_after != -1 :
break; break
else: else:
time.sleep(1) time.sleep(1)
if leader_after != leader_before: if leader_after != leader_before:
tdLog.debug("leader changed") tdLog.debug("leader changed")
break; break
else : else :
time.sleep(1) time.sleep(1)
@ -115,7 +77,7 @@ class TDTestCase:
except TmqError: except TmqError:
tdLog.exit(f"subscribe error") tdLog.exit(f"subscribe error")
cnt = 0; cnt = 0
balance = False balance = False
try: try:
while True: while True:

View File

@ -92,7 +92,7 @@ class TDTestCase:
def run(self): def run(self):
for fill_history_value in [None, 1]: for fill_history_value in [None, 1]:
for watermark in [None, random.randint(20, 30)]: for watermark in [None, random.randint(20, 30)]:
self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(1, 3)}s", fill_history_value=fill_history_value) self.watermark_max_delay_session(session=random.randint(10, 15), watermark=watermark, max_delay=f"{random.randint(5, 8)}s", fill_history_value=fill_history_value)
def stop(self): def stop(self):
tdSql.close() tdSql.close()