test: change sum to high performance version
This commit is contained in:
parent
6d42ddb7f9
commit
70043090dc
|
@ -1,3 +1,5 @@
|
|||
import pickle
|
||||
|
||||
def init():
|
||||
pass
|
||||
|
||||
|
|
|
@ -7,30 +7,20 @@ def destroy():
|
|||
pass
|
||||
|
||||
def start():
|
||||
return pickle.dumps([])
|
||||
return pickle.dumps(None)
|
||||
|
||||
def finish(buf):
|
||||
sums = pickle.loads(buf)
|
||||
all = None
|
||||
for sum in sums:
|
||||
if all is None:
|
||||
all = sum
|
||||
else:
|
||||
all += sum
|
||||
return all
|
||||
sum = pickle.loads(buf)
|
||||
return sum
|
||||
|
||||
def reduce(datablock, buf):
|
||||
(rows, cols) = datablock.shape()
|
||||
sums = pickle.loads(buf)
|
||||
sum = None
|
||||
sum = pickle.loads(buf)
|
||||
for i in range(rows):
|
||||
val = datablock.data(i, 0)
|
||||
if val is not None:
|
||||
if sum is None:
|
||||
sum = val
|
||||
else:
|
||||
sum += val
|
||||
|
||||
if sum is not None:
|
||||
sums.append(sum)
|
||||
return pickle.dumps(sums)
|
||||
sum += val
|
||||
return pickle.dumps(sum)
|
||||
|
|
|
@ -209,12 +209,12 @@ class TDTestCase:
|
|||
tdSql.checkData(i, j, result1[i][j])
|
||||
|
||||
# same value like select col1, udf_fun1(col1) from st
|
||||
def verify_same_value(self, sql):
|
||||
def verify_same_value(self, sql, col=0):
|
||||
tdSql.query(sql)
|
||||
nrows = tdSql.getRows()
|
||||
for i in range(nrows):
|
||||
val = tdSql.getData(i, 0)
|
||||
tdSql.checkData(i, 1, val)
|
||||
val = tdSql.getData(i, col)
|
||||
tdSql.checkData(i, col + 1, val)
|
||||
|
||||
# verify multi values
|
||||
def verify_same_multi_values(self, sql):
|
||||
|
@ -395,6 +395,24 @@ class TDTestCase:
|
|||
tdSql.execute(sql)
|
||||
tdLog.info(f" insert {rows} to child table {self.child_count} .")
|
||||
|
||||
|
||||
# create stream
|
||||
def create_stream(self):
|
||||
sql = f"create stream ma into sta subtable(concat('sta_',tbname)) \
|
||||
as select _wstart,count(col1),af_count_bigint(col1) from {self.stbname} partition by tbname interval(1s);"
|
||||
tdSql.execute(sql)
|
||||
tdLog.info(sql)
|
||||
|
||||
# query stream
|
||||
def verify_stream(self):
|
||||
sql = f"select * from sta limit 10"
|
||||
self.verify_same_value(sql, 1)
|
||||
|
||||
# create tmq
|
||||
def create_tmq(self):
|
||||
sql = f"create topic topa as select concat(col12,t12),sf_concat_var(col12,t12) from {self.stbname};"
|
||||
tdSql.execute(sql)
|
||||
tdLog.info(sql)
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
|
@ -402,14 +420,23 @@ class TDTestCase:
|
|||
stable = "meters"
|
||||
tbname = "d"
|
||||
count = 10
|
||||
rows = 50000
|
||||
rows = 5000
|
||||
# do
|
||||
self.create_table(stable, tbname, count)
|
||||
self.insert_data(tbname, rows)
|
||||
|
||||
# create
|
||||
self.create_scalar_udfpy()
|
||||
self.create_aggr_udfpy()
|
||||
|
||||
# create stream
|
||||
self.create_stream()
|
||||
|
||||
# create tmq
|
||||
self.create_tmq()
|
||||
|
||||
# insert data
|
||||
self.insert_data(tbname, rows)
|
||||
|
||||
# query
|
||||
self.query_scalar_udfpy()
|
||||
self.query_aggr_udfpy()
|
||||
|
|
Loading…
Reference in New Issue