From 986d5c77525fb87e468ea5060d210caa83f0312a Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Tue, 12 Jul 2022 14:45:00 +0800 Subject: [PATCH 1/6] test:add test case for udf coltrol --- tests/system-test/0-others/udf_cfg1.py | 305 +++++++++++ tests/system-test/0-others/udf_cfg2.py | 667 +++++++++++++++++++++++++ tests/system-test/fulltest.sh | 2 + 3 files changed, 974 insertions(+) create mode 100644 tests/system-test/0-others/udf_cfg1.py create mode 100644 tests/system-test/0-others/udf_cfg2.py diff --git a/tests/system-test/0-others/udf_cfg1.py b/tests/system-test/0-others/udf_cfg1.py new file mode 100644 index 0000000000..fc9265617d --- /dev/null +++ b/tests/system-test/0-others/udf_cfg1.py @@ -0,0 +1,305 @@ +from distutils.log import error +import taos +import sys +import time +import os +import platform + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +import subprocess + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, "cDebugFlag": 143, "uDebugFlag": 143, "rpcDebugFlag": 143, "tmrDebugFlag": 143, + "jniDebugFlag": 143, "simDebugFlag": 143, "dDebugFlag": 143, "dDebugFlag": 143, "vDebugFlag": 143, "mDebugFlag": 143, "qDebugFlag": 143, + "wDebugFlag": 143, "sDebugFlag": 143, "tsdbDebugFlag": 143, "tqDebugFlag": 143, "fsDebugFlag": 143, "fnDebugFlag": 143 ,"udf":0} + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), logSql) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def prepare_udf_so(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + print(projPath) + + if platform.system().lower() == 'windows': + self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf2 = subprocess.Popen('(for /r %s %%i in ("udf2.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + if (not tdDnodes.dnodes[0].remoteIP == ""): + tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\") + tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2.so',projPath+"\\debug\\build\\lib\\") + self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so') + self.libudf2 = self.libudf2.replace('udf2.dll','libudf2.so') + else: + self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf1 = self.libudf1.replace('\r','').replace('\n','') + self.libudf2 = self.libudf2.replace('\r','').replace('\n','') + + + def prepare_data(self): + + tdSql.execute("drop database if exists db ") + tdSql.execute("create database if not exists db duration 300") + tdSql.execute("use db") + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + tdSql.execute("create table tb (ts timestamp , num1 int , num2 int, num3 double , num4 binary(30))") + tdSql.execute( + f'''insert into tb values + ( '2020-04-21 01:01:01.000', NULL, 1, 1, "binary1" ) + ( '2020-10-21 01:01:01.000', 1, 1, 1.11, "binary1" ) + ( '2020-12-31 01:01:01.000', 2, 22222, 22, "binary1" ) + ( '2021-01-01 01:01:06.000', 3, 33333, 33, "binary1" ) + ( '2021-05-07 01:01:10.000', 4, 44444, 44, "binary1" ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, "binary1" ) + ( '2021-09-30 01:01:16.000', 5, 55555, 55, "binary1" ) + ( '2022-02-01 01:01:20.000', 6, 66666, 66, "binary1" ) + ( '2022-10-28 01:01:26.000', 0, 00000, 00, "binary1" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -88, "binary1" ) + ( '2022-12-31 01:01:36.000', 9, -9999999, -99, "binary1" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, "binary1" ) + ''' + ) + + # udf functions with join + ts_start = 1652517451000 + tdSql.execute("create stable st (ts timestamp , c1 int , c2 int ,c3 double ,c4 double ) tags(ind int)") + tdSql.execute("create table sub1 using st tags(1)") + tdSql.execute("create table sub2 using st tags(2)") + + for i in range(10): + ts = ts_start + i *1000 + tdSql.execute(" insert into sub1 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) + tdSql.execute(" insert into sub2 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) + + + def create_udf_function(self): + + for i in range(5): + # create scalar functions + tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1) + + # create aggregate functions + + tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2) + + functions = tdSql.getResult("show functions") + function_nums = len(functions) + if function_nums == 2: + tdLog.info("create two udf functions success ") + + # drop functions + + tdSql.execute("drop function udf1") + tdSql.execute("drop function udf2") + + functions = tdSql.getResult("show functions") + for function in functions: + if "udf1" in function[0] or "udf2" in function[0]: + tdLog.info("drop udf functions failed ") + tdLog.exit("drop udf functions failed") + + tdLog.info("drop two udf functions success ") + + # create scalar functions + tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1) + + # create aggregate functions + + tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2) + + functions = tdSql.getResult("show functions") + function_nums = len(functions) + if function_nums == 2: + tdLog.info("create two udf functions success ") + + def basic_udf_query(self): + + # scalar functions + + tdSql.execute("use db ") + tdSql.error("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") + tdSql.error("select c1 , udf1(c1) ,c2 ,udf1(c2), c3 ,udf1(c3), c4 ,udf1(c4) from stb1 order by c1") + + # aggregate functions + tdSql.error("select udf2(num1) ,udf2(num2), udf2(num3) from tb") + + # Arithmetic compute + tdSql.error("select udf2(num1)+100 ,udf2(num2)-100, udf2(num3)*100 ,udf2(num3)/100 from tb") + + tdSql.error("select udf2(c1) ,udf2(c6) from stb1 ") + + tdSql.error("select udf2(c1)+100 ,udf2(c6)-100 ,udf2(c1)*100 ,udf2(c6)/100 from stb1 ") + + # # bug for crash when query sub table + tdSql.error("select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from ct1") + + tdSql.error("select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from stb1 ") + + # regular table with aggregate functions + + tdSql.error("select udf1(num1) , count(num1) from tb;") + tdSql.error("select udf1(num1) , avg(num1) from tb;") + tdSql.error("select udf1(num1) , twa(num1) from tb;") + tdSql.error("select udf1(num1) , irate(num1) from tb;") + tdSql.error("select udf1(num1) , sum(num1) from tb;") + tdSql.error("select udf1(num1) , stddev(num1) from tb;") + tdSql.error("select udf1(num1) , mode(num1) from tb;") + tdSql.error("select udf1(num1) , HYPERLOGLOG(num1) from tb;") + # stable + tdSql.error("select udf1(c1) , count(c1) from stb1;") + tdSql.error("select udf1(c1) , avg(c1) from stb1;") + tdSql.error("select udf1(c1) , twa(c1) from stb1;") + tdSql.error("select udf1(c1) , irate(c1) from stb1;") + tdSql.error("select udf1(c1) , sum(c1) from stb1;") + tdSql.error("select udf1(c1) , stddev(c1) from stb1;") + tdSql.error("select udf1(c1) , mode(c1) from stb1;") + tdSql.error("select udf1(c1) , HYPERLOGLOG(c1) from stb1;") + + # regular table with select functions + + tdSql.error("select udf1(num1) , max(num1) from tb;") + + + tdSql.error("select udf1(num1) , min(num1) from tb;") + tdSql.error("select udf1(num1) , first(num1) from tb;") + tdSql.error("select udf1(num1) , last(num1) from tb;") + tdSql.error("select udf1(num1) , top(num1,1) from tb;") + tdSql.error("select udf1(num1) , bottom(num1,1) from tb;") + + + # stable + tdSql.error("select udf1(c1) , max(c1) from stb1;") + tdSql.error("select udf1(c1) , min(c1) from stb1;") + tdSql.error("select udf1(c1) , first(c1) from stb1;") + tdSql.error("select udf1(c1) , last(c1) from stb1;") + tdSql.error("select udf1(c1) , top(c1 ,1) from stb1;") + tdSql.error("select udf1(c1) , bottom(c1,1) from stb1;") + + + # regular table with compute functions + + tdSql.error("select udf1(num1) , abs(num1) from tb;") + + # stable with compute functions + tdSql.error("select udf1(c1) , abs(c1) from stb1;") + + # nest query + tdSql.error("select abs(udf1(c1)) , abs(ceil(c1)) from stb1 order by ts;") + tdSql.error("select abs(udf1(c1)) , abs(ceil(c1)) from ct1 order by ts;") + + # bug fix for crash + # order by udf function result + for _ in range(50): + tdSql.error("select udf2(c1) from stb1 group by 1-udf1(c1)") + print(tdSql.queryResult) + + # udf functions with filter + + tdSql.error("select c1 ,udf1(c1) , c6 ,udf1(c6) from stb1 where c1 > 8 order by ts") + tdSql.error("select udf1(sub1.c1), udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + + tdSql.error("select sub1.c1 , udf1(sub1.c1), sub2.c2 ,udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.error("select udf2(sub1.c1), udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.error("select sub1.c1 , udf2(sub1.c1), sub2.c2 ,udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + + # udf functions with group by + tdSql.error("select udf1(c1) from ct1 group by c1") + tdSql.error("select udf1(c1) from stb1 group by c1") + tdSql.error("select c1,c2, udf1(c1,c2) from ct1 group by c1,c2") + tdSql.error("select c1,c2, udf1(c1,c2) from stb1 group by c1,c2") + tdSql.error("select udf2(c1) from ct1 group by c1") + tdSql.error("select udf2(c1) from stb1 group by c1") + tdSql.error("select c1,c2, udf2(c1,c6) from ct1 group by c1,c2") + tdSql.error("select c1,c2, udf2(c1,c6) from stb1 group by c1,c2") + tdSql.error("select udf2(c1) from stb1 group by udf1(c1)") + tdSql.error("select udf2(c1) from stb1 group by floor(c1)") + + # udf mix with order by + tdSql.error("select udf2(c1) from stb1 group by floor(c1) order by udf2(c1)") + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + + print(" env is ok for all ") + self.prepare_udf_so() + self.prepare_data() + self.create_udf_function() + self.basic_udf_query() + + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/0-others/udf_cfg2.py b/tests/system-test/0-others/udf_cfg2.py new file mode 100644 index 0000000000..07f83b1455 --- /dev/null +++ b/tests/system-test/0-others/udf_cfg2.py @@ -0,0 +1,667 @@ +from distutils.log import error +import taos +import sys +import time +import os +import platform + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +import subprocess + +class TDTestCase: + updatecfgDict = {'debugFlag': 143, "cDebugFlag": 143, "uDebugFlag": 143, "rpcDebugFlag": 143, "tmrDebugFlag": 143, + "jniDebugFlag": 143, "simDebugFlag": 143, "dDebugFlag": 143, "dDebugFlag": 143, "vDebugFlag": 143, "mDebugFlag": 143, "qDebugFlag": 143, + "wDebugFlag": 143, "sDebugFlag": 143, "tsdbDebugFlag": 143, "tqDebugFlag": 143, "fsDebugFlag": 143, "fnDebugFlag": 143 ,"udf":1} + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), logSql) + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def prepare_udf_so(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + print(projPath) + + if platform.system().lower() == 'windows': + self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf2 = subprocess.Popen('(for /r %s %%i in ("udf2.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + if (not tdDnodes.dnodes[0].remoteIP == ""): + tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\") + tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf2.so',projPath+"\\debug\\build\\lib\\") + self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so') + self.libudf2 = self.libudf2.replace('udf2.dll','libudf2.so') + else: + self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf2 = subprocess.Popen('find %s -name "libudf2.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8") + self.libudf1 = self.libudf1.replace('\r','').replace('\n','') + self.libudf2 = self.libudf2.replace('\r','').replace('\n','') + + + def prepare_data(self): + + tdSql.execute("drop database if exists db ") + tdSql.execute("create database if not exists db duration 300") + tdSql.execute("use db") + tdSql.execute( + '''create table stb1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + tags (t1 int) + ''' + ) + + tdSql.execute( + ''' + create table t1 + (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) + ''' + ) + for i in range(4): + tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') + + for i in range(9): + tdSql.execute( + f"insert into ct1 values ( now()-{i*10}s, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute( + f"insert into ct4 values ( now()-{i*90}d, {1*i}, {11111*i}, {111*i}, {11*i}, {1.11*i}, {11.11*i}, {i%2}, 'binary{i}', 'nchar{i}', now()+{1*i}a )" + ) + tdSql.execute("insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )") + tdSql.execute("insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )") + tdSql.execute("insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )") + + tdSql.execute("insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + tdSql.execute("insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ") + + tdSql.execute( + f'''insert into t1 values + ( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a ) + ( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a ) + ( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a ) + ( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a ) + ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) + ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) + ''' + ) + + tdSql.execute("create table tb (ts timestamp , num1 int , num2 int, num3 double , num4 binary(30))") + tdSql.execute( + f'''insert into tb values + ( '2020-04-21 01:01:01.000', NULL, 1, 1, "binary1" ) + ( '2020-10-21 01:01:01.000', 1, 1, 1.11, "binary1" ) + ( '2020-12-31 01:01:01.000', 2, 22222, 22, "binary1" ) + ( '2021-01-01 01:01:06.000', 3, 33333, 33, "binary1" ) + ( '2021-05-07 01:01:10.000', 4, 44444, 44, "binary1" ) + ( '2021-07-21 01:01:01.000', NULL, NULL, NULL, "binary1" ) + ( '2021-09-30 01:01:16.000', 5, 55555, 55, "binary1" ) + ( '2022-02-01 01:01:20.000', 6, 66666, 66, "binary1" ) + ( '2022-10-28 01:01:26.000', 0, 00000, 00, "binary1" ) + ( '2022-12-01 01:01:30.000', 8, -88888, -88, "binary1" ) + ( '2022-12-31 01:01:36.000', 9, -9999999, -99, "binary1" ) + ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, "binary1" ) + ''' + ) + + # udf functions with join + ts_start = 1652517451000 + tdSql.execute("create stable st (ts timestamp , c1 int , c2 int ,c3 double ,c4 double ) tags(ind int)") + tdSql.execute("create table sub1 using st tags(1)") + tdSql.execute("create table sub2 using st tags(2)") + + for i in range(10): + ts = ts_start + i *1000 + tdSql.execute(" insert into sub1 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) + tdSql.execute(" insert into sub2 values({} , {},{},{},{})".format(ts,i ,i*10,i*100.0,i*1000.0)) + + + def create_udf_function(self): + + for i in range(5): + # create scalar functions + tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1) + + # create aggregate functions + + tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2) + + functions = tdSql.getResult("show functions") + function_nums = len(functions) + if function_nums == 2: + tdLog.info("create two udf functions success ") + + # drop functions + + tdSql.execute("drop function udf1") + tdSql.execute("drop function udf2") + + functions = tdSql.getResult("show functions") + for function in functions: + if "udf1" in function[0] or "udf2" in function[0]: + tdLog.info("drop udf functions failed ") + tdLog.exit("drop udf functions failed") + + tdLog.info("drop two udf functions success ") + + # create scalar functions + tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1) + + # create aggregate functions + + tdSql.execute("create aggregate function udf2 as '%s' outputtype double bufSize 8;"%self.libudf2) + + functions = tdSql.getResult("show functions") + function_nums = len(functions) + if function_nums == 2: + tdLog.info("create two udf functions success ") + + def basic_udf_query(self): + + # scalar functions + + tdSql.execute("use db ") + tdSql.query("select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb") + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,None) + tdSql.checkData(0,2,1) + tdSql.checkData(0,3,88) + tdSql.checkData(0,4,1.000000000) + tdSql.checkData(0,5,88) + tdSql.checkData(0,6,"binary1") + tdSql.checkData(0,7,88) + + tdSql.checkData(3,0,3) + tdSql.checkData(3,1,88) + tdSql.checkData(3,2,33333) + tdSql.checkData(3,3,88) + tdSql.checkData(3,4,33.000000000) + tdSql.checkData(3,5,88) + tdSql.checkData(3,6,"binary1") + tdSql.checkData(3,7,88) + + tdSql.checkData(11,0,None) + tdSql.checkData(11,1,None) + tdSql.checkData(11,2,None) + tdSql.checkData(11,3,None) + tdSql.checkData(11,4,None) + tdSql.checkData(11,5,None) + tdSql.checkData(11,6,"binary1") + tdSql.checkData(11,7,88) + + tdSql.query("select c1 , udf1(c1) ,c2 ,udf1(c2), c3 ,udf1(c3), c4 ,udf1(c4) from stb1 order by c1") + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,None) + tdSql.checkData(0,2,None) + tdSql.checkData(0,3,None) + tdSql.checkData(0,4,None) + tdSql.checkData(0,5,None) + tdSql.checkData(0,6,None) + tdSql.checkData(0,7,None) + + tdSql.checkData(20,0,8) + tdSql.checkData(20,1,88) + tdSql.checkData(20,2,88888) + tdSql.checkData(20,3,88) + tdSql.checkData(20,4,888) + tdSql.checkData(20,5,88) + tdSql.checkData(20,6,88) + tdSql.checkData(20,7,88) + + + # aggregate functions + tdSql.query("select udf2(num1) ,udf2(num2), udf2(num3) from tb") + tdSql.checkData(0,0,15.362291496) + tdSql.checkData(0,1,10000949.553189287) + tdSql.checkData(0,2,168.633425216) + + # Arithmetic compute + tdSql.query("select udf2(num1)+100 ,udf2(num2)-100, udf2(num3)*100 ,udf2(num3)/100 from tb") + tdSql.checkData(0,0,115.362291496) + tdSql.checkData(0,1,10000849.553189287) + tdSql.checkData(0,2,16863.342521576) + tdSql.checkData(0,3,1.686334252) + + tdSql.query("select udf2(c1) ,udf2(c6) from stb1 ") + tdSql.checkData(0,0,25.514701644) + tdSql.checkData(0,1,265.247614504) + + tdSql.query("select udf2(c1)+100 ,udf2(c6)-100 ,udf2(c1)*100 ,udf2(c6)/100 from stb1 ") + tdSql.checkData(0,0,125.514701644) + tdSql.checkData(0,1,165.247614504) + tdSql.checkData(0,2,2551.470164435) + tdSql.checkData(0,3,2.652476145) + + # # bug for crash when query sub table + tdSql.query("select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from ct1") + tdSql.checkData(0,0,378.215547010) + tdSql.checkData(0,1,353.808067460) + tdSql.checkData(0,2,2114.237451187) + tdSql.checkData(0,3,2.125468151) + + tdSql.query("select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from stb1 ") + tdSql.checkData(0,0,490.358032462) + tdSql.checkData(0,1,400.460106627) + tdSql.checkData(0,2,2551.470164435) + tdSql.checkData(0,3,2.652476145) + + + # regular table with aggregate functions + + tdSql.error("select udf1(num1) , count(num1) from tb;") + tdSql.error("select udf1(num1) , avg(num1) from tb;") + tdSql.error("select udf1(num1) , twa(num1) from tb;") + tdSql.error("select udf1(num1) , irate(num1) from tb;") + tdSql.error("select udf1(num1) , sum(num1) from tb;") + tdSql.error("select udf1(num1) , stddev(num1) from tb;") + tdSql.error("select udf1(num1) , mode(num1) from tb;") + tdSql.error("select udf1(num1) , HYPERLOGLOG(num1) from tb;") + # stable + tdSql.error("select udf1(c1) , count(c1) from stb1;") + tdSql.error("select udf1(c1) , avg(c1) from stb1;") + tdSql.error("select udf1(c1) , twa(c1) from stb1;") + tdSql.error("select udf1(c1) , irate(c1) from stb1;") + tdSql.error("select udf1(c1) , sum(c1) from stb1;") + tdSql.error("select udf1(c1) , stddev(c1) from stb1;") + tdSql.error("select udf1(c1) , mode(c1) from stb1;") + tdSql.error("select udf1(c1) , HYPERLOGLOG(c1) from stb1;") + + # regular table with select functions + + tdSql.query("select udf1(num1) , max(num1) from tb;") + tdSql.checkRows(1) + tdSql.query("select floor(num1) , max(num1) from tb;") + tdSql.checkRows(1) + tdSql.query("select udf1(num1) , min(num1) from tb;") + tdSql.checkRows(1) + tdSql.query("select ceil(num1) , min(num1) from tb;") + tdSql.checkRows(1) + tdSql.query("select udf1(num1) , first(num1) from tb;") + + tdSql.query("select abs(num1) , first(num1) from tb;") + + tdSql.query("select udf1(num1) , last(num1) from tb;") + + tdSql.query("select round(num1) , last(num1) from tb;") + + tdSql.query("select udf1(num1) , top(num1,1) from tb;") + tdSql.checkRows(1) + tdSql.query("select udf1(num1) , bottom(num1,1) from tb;") + tdSql.checkRows(1) + # tdSql.query("select udf1(num1) , last_row(num1) from tb;") + # tdSql.checkRows(1) + + # tdSql.query("select round(num1) , last_row(num1) from tb;") + # tdSql.checkRows(1) + + + # stable + tdSql.query("select udf1(c1) , max(c1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select abs(c1) , max(c1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select udf1(c1) , min(c1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select floor(c1) , min(c1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select udf1(c1) , first(c1) from stb1;") + + tdSql.query("select udf1(c1) , last(c1) from stb1;") + + tdSql.query("select udf1(c1) , top(c1 ,1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select abs(c1) , top(c1 ,1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select udf1(c1) , bottom(c1,1) from stb1;") + tdSql.checkRows(1) + tdSql.query("select ceil(c1) , bottom(c1,1) from stb1;") + tdSql.checkRows(1) + + # tdSql.query("select udf1(c1) , last_row(c1) from stb1;") + # tdSql.checkRows(1) + # tdSql.query("select ceil(c1) , last_row(c1) from stb1;") + # tdSql.checkRows(1) + + # regular table with compute functions + + tdSql.query("select udf1(num1) , abs(num1) from tb;") + tdSql.checkRows(12) + tdSql.query("select floor(num1) , abs(num1) from tb;") + tdSql.checkRows(12) + + # # bug need fix + + #tdSql.query("select udf1(num1) , csum(num1) from tb;") + #tdSql.checkRows(9) + #tdSql.query("select ceil(num1) , csum(num1) from tb;") + #tdSql.checkRows(9) + #tdSql.query("select udf1(c1) , csum(c1) from stb1;") + #tdSql.checkRows(22) + #tdSql.query("select floor(c1) , csum(c1) from stb1;") + #tdSql.checkRows(22) + + # stable with compute functions + tdSql.query("select udf1(c1) , abs(c1) from stb1;") + tdSql.checkRows(25) + tdSql.query("select abs(c1) , ceil(c1) from stb1;") + tdSql.checkRows(25) + + # nest query + tdSql.query("select abs(udf1(c1)) , abs(ceil(c1)) from stb1 order by ts;") + tdSql.checkRows(25) + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,None) + tdSql.checkData(1,0,88) + tdSql.checkData(1,1,8) + + tdSql.query("select abs(udf1(c1)) , abs(ceil(c1)) from ct1 order by ts;") + tdSql.checkRows(13) + tdSql.checkData(0,0,88) + tdSql.checkData(0,1,8) + tdSql.checkData(1,0,88) + tdSql.checkData(1,1,7) + + # bug fix for crash + # order by udf function result + for _ in range(50): + tdSql.query("select udf2(c1) from stb1 group by 1-udf1(c1)") + print(tdSql.queryResult) + + # udf functions with filter + + tdSql.query("select abs(udf1(c1)) , abs(ceil(c1)) from stb1 where c1 is null order by ts;") + tdSql.checkRows(3) + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,None) + + tdSql.query("select c1 ,udf1(c1) , c6 ,udf1(c6) from stb1 where c1 > 8 order by ts") + tdSql.checkRows(3) + tdSql.checkData(0,0,9) + tdSql.checkData(0,1,88) + tdSql.checkData(0,2,-99.990000000) + tdSql.checkData(0,3,88) + + tdSql.query("select sub1.c1, sub2.c2 from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,0) + tdSql.checkData(0,1,0) + tdSql.checkData(1,0,1) + tdSql.checkData(1,1,10) + + tdSql.query("select udf1(sub1.c1), udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,88) + tdSql.checkData(0,1,88) + tdSql.checkData(1,0,88) + tdSql.checkData(1,1,88) + + tdSql.query("select sub1.c1 , udf1(sub1.c1), sub2.c2 ,udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,0) + tdSql.checkData(0,1,88) + tdSql.checkData(0,2,0) + tdSql.checkData(0,3,88) + tdSql.checkData(1,0,1) + tdSql.checkData(1,1,88) + tdSql.checkData(1,2,10) + tdSql.checkData(1,3,88) + + tdSql.query("select udf2(sub1.c1), udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,16.881943016) + tdSql.checkData(0,1,168.819430161) + tdSql.error("select sub1.c1 , udf2(sub1.c1), sub2.c2 ,udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + + # udf functions with group by + tdSql.query("select udf1(c1) from ct1 group by c1") + tdSql.checkRows(10) + tdSql.query("select udf1(c1) from stb1 group by c1") + tdSql.checkRows(11) + tdSql.query("select c1,c2, udf1(c1,c2) from ct1 group by c1,c2") + tdSql.checkRows(10) + tdSql.query("select c1,c2, udf1(c1,c2) from stb1 group by c1,c2") + tdSql.checkRows(11) + + tdSql.query("select udf2(c1) from ct1 group by c1") + tdSql.checkRows(10) + tdSql.query("select udf2(c1) from stb1 group by c1") + tdSql.checkRows(11) + tdSql.query("select c1,c2, udf2(c1,c6) from ct1 group by c1,c2") + tdSql.checkRows(10) + tdSql.query("select c1,c2, udf2(c1,c6) from stb1 group by c1,c2") + tdSql.checkRows(11) + tdSql.query("select udf2(c1) from stb1 group by udf1(c1)") + tdSql.checkRows(2) + tdSql.query("select udf2(c1) from stb1 group by floor(c1)") + tdSql.checkRows(11) + + # udf mix with order by + tdSql.query("select udf2(c1) from stb1 group by floor(c1) order by udf2(c1)") + tdSql.checkRows(11) + + + def multi_cols_udf(self): + tdSql.query("select num1,num2,num3,udf1(num1,num2,num3) from tb") + tdSql.checkData(0,0,None) + tdSql.checkData(0,1,1) + tdSql.checkData(0,2,1.000000000) + tdSql.checkData(0,3,None) + tdSql.checkData(1,0,1) + tdSql.checkData(1,1,1) + tdSql.checkData(1,2,1.110000000) + tdSql.checkData(1,3,88) + + tdSql.query("select c1,c6,udf1(c1,c6) from stb1 order by ts") + tdSql.checkData(1,0,8) + tdSql.checkData(1,1,88.880000000) + tdSql.checkData(1,2,88) + + tdSql.query("select abs(udf1(c1,c6,c1,c6)) , abs(ceil(c1)) from stb1 where c1 is not null order by ts;") + tdSql.checkRows(22) + + tdSql.query("select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,169.661427555) + tdSql.checkData(0,1,169.661427555) + + def try_query_sql(self): + udf1_sqls = [ + "select num1 , udf1(num1) ,num2 ,udf1(num2),num3 ,udf1(num3),num4 ,udf1(num4) from tb" , + "select c1 , udf1(c1) ,c2 ,udf1(c2), c3 ,udf1(c3), c4 ,udf1(c4) from stb1 order by c1" , + "select udf1(num1) , max(num1) from tb;" , + "select udf1(num1) , min(num1) from tb;" , + #"select udf1(num1) , top(num1,1) from tb;" , + #"select udf1(num1) , bottom(num1,1) from tb;" , + "select udf1(c1) , max(c1) from stb1;" , + "select udf1(c1) , min(c1) from stb1;" , + #"select udf1(c1) , top(c1 ,1) from stb1;" , + #"select udf1(c1) , bottom(c1,1) from stb1;" , + "select udf1(num1) , abs(num1) from tb;" , + #"select udf1(num1) , csum(num1) from tb;" , + #"select udf1(c1) , csum(c1) from stb1;" , + "select udf1(c1) , abs(c1) from stb1;" , + "select abs(udf1(c1)) , abs(ceil(c1)) from stb1 order by ts;" , + "select abs(udf1(c1)) , abs(ceil(c1)) from ct1 order by ts;" , + "select abs(udf1(c1)) , abs(ceil(c1)) from stb1 where c1 is null order by ts;" , + "select c1 ,udf1(c1) , c6 ,udf1(c6) from stb1 where c1 > 8 order by ts" , + "select udf1(sub1.c1), udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select sub1.c1 , udf1(sub1.c1), sub2.c2 ,udf1(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf1(c1) from ct1 group by c1" , + "select udf1(c1) from stb1 group by c1" , + "select c1,c2, udf1(c1,c2) from ct1 group by c1,c2" , + "select c1,c2, udf1(c1,c2) from stb1 group by c1,c2" , + "select num1,num2,num3,udf1(num1,num2,num3) from tb" , + "select c1,c6,udf1(c1,c6) from stb1 order by ts" , + "select abs(udf1(c1,c6,c1,c6)) , abs(ceil(c1)) from stb1 where c1 is not null order by ts;" + ] + udf2_sqls = ["select udf2(sub1.c1), udf2(sub2.c2) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(c1) from stb1 group by 1-udf1(c1)" , + "select udf2(num1) ,udf2(num2), udf2(num3) from tb" , + "select udf2(num1)+100 ,udf2(num2)-100, udf2(num3)*100 ,udf2(num3)/100 from tb" , + "select udf2(c1) ,udf2(c6) from stb1 " , + "select udf2(c1)+100 ,udf2(c6)-100 ,udf2(c1)*100 ,udf2(c6)/100 from stb1 " , + "select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from ct1" , + "select udf2(c1+100) ,udf2(c6-100) ,udf2(c1*100) ,udf2(c6/100) from stb1 " , + "select udf2(c1) from ct1 group by c1" , + "select udf2(c1) from stb1 group by c1" , + "select c1,c2, udf2(c1,c6) from ct1 group by c1,c2" , + "select c1,c2, udf2(c1,c6) from stb1 group by c1,c2" , + "select udf2(c1) from stb1 group by udf1(c1)" , + "select udf2(c1) from stb1 group by floor(c1)" , + "select udf2(c1) from stb1 group by floor(c1) order by udf2(c1)" , + + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null" , + "select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null"] + + return udf1_sqls ,udf2_sqls + + + + def unexpected_create(self): + + tdLog.info(" create function with out bufsize ") + tdSql.query("drop function udf1 ") + tdSql.query("drop function udf2 ") + + # create function without buffer + tdSql.execute("create function udf1 as '%s' outputtype int"%self.libudf1) + tdSql.execute("create aggregate function udf2 as '%s' outputtype double"%self.libudf2) + udf1_sqls ,udf2_sqls = self.try_query_sql() + + for scalar_sql in udf1_sqls: + tdSql.query(scalar_sql) + for aggregate_sql in udf2_sqls: + tdSql.error(aggregate_sql) + + # create function without aggregate + + tdLog.info(" create function with out aggregate ") + tdSql.query("drop function udf1 ") + tdSql.query("drop function udf2 ") + + # create function without buffer + tdSql.execute("create aggregate function udf1 as '%s' outputtype int bufSize 8 "%self.libudf1) + tdSql.execute("create function udf2 as '%s' outputtype double bufSize 8"%self.libudf2) + udf1_sqls ,udf2_sqls = self.try_query_sql() + + for scalar_sql in udf1_sqls: + tdSql.error(scalar_sql) + for aggregate_sql in udf2_sqls: + tdSql.error(aggregate_sql) + + tdSql.execute(" create function db as '%s' outputtype int bufSize 8 "%self.libudf1) + tdSql.execute(" create aggregate function test as '%s' outputtype int bufSize 8 "%self.libudf1) + tdSql.error(" select db(c1) from stb1 ") + tdSql.error(" select db(c1,c6), db(c6) from stb1 ") + tdSql.error(" select db(num1,num2), db(num1) from tb ") + tdSql.error(" select test(c1) from stb1 ") + tdSql.error(" select test(c1,c6), test(c6) from stb1 ") + tdSql.error(" select test(num1,num2), test(num1) from tb ") + + + + def loop_kill_udfd(self): + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + + cfgPath = buildPath + "/../sim/dnode1/cfg" + udfdPath = buildPath +'/build/bin/udfd' + + for i in range(3): + + tdLog.info(" loop restart udfd %d_th" % i) + + tdSql.query("select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,169.661427555) + tdSql.checkData(0,1,169.661427555) + # stop udfd cmds + get_processID = "ps -ef | grep -w udfd | grep -v grep| grep -v defunct | awk '{print $2}'" + processID = subprocess.check_output(get_processID, shell=True).decode("utf-8") + stop_udfd = " kill -9 %s" % processID + os.system(stop_udfd) + + time.sleep(2) + + tdSql.query("select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,169.661427555) + tdSql.checkData(0,1,169.661427555) + + # # start udfd cmds + # start_udfd = "nohup " + udfdPath +'-c' +cfgPath +" > /dev/null 2>&1 &" + # tdLog.info("start udfd : %s " % start_udfd) + + def test_function_name(self): + tdLog.info(" create function name is not build_in functions ") + tdSql.execute(" drop function udf1 ") + tdSql.execute(" drop function udf2 ") + tdSql.error("create function max as '%s' outputtype int bufSize 8"%self.libudf1) + tdSql.error("create aggregate function sum as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create function max as '%s' outputtype int bufSize 8"%self.libudf1) + tdSql.error("create aggregate function sum as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function tbname as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function function as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function stable as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function union as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function 123 as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function 123db as '%s' outputtype double bufSize 8"%self.libudf2) + tdSql.error("create aggregate function mnode as '%s' outputtype double bufSize 8"%self.libudf2) + + def restart_taosd_query_udf(self): + + self.create_udf_function() + + for i in range(5): + tdLog.info(" this is %d_th restart taosd " %i) + tdSql.execute("use db ") + tdSql.query("select count(*) from stb1") + tdSql.checkRows(1) + tdSql.query("select udf2(sub1.c1 ,sub1.c2), udf2(sub2.c2 ,sub2.c1) from sub1, sub2 where sub1.ts=sub2.ts and sub1.c1 is not null") + tdSql.checkData(0,0,169.661427555) + tdSql.checkData(0,1,169.661427555) + tdDnodes.stop(1) + tdDnodes.start(1) + time.sleep(2) + + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + + print(" env is ok for all ") + self.prepare_udf_so() + self.prepare_data() + self.create_udf_function() + self.basic_udf_query() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 3b0dd76a30..ffa7befd69 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -11,6 +11,8 @@ python3 ./test.py -f 0-others/udfTest.py python3 ./test.py -f 0-others/udf_create.py python3 ./test.py -f 0-others/udf_restart_taosd.py python3 ./test.py -f 0-others/cachelast.py +python3 ./test.py -f 0-others/udf_cfg1.py +python3 ./test.py -f 0-others/udf_cfg2.py python3 ./test.py -f 0-others/user_control.py python3 ./test.py -f 0-others/fsync.py From edfd4b5175d00d5a0db23f88d648afd6845abd1c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 14 Jul 2022 18:51:34 +0800 Subject: [PATCH 2/6] enh: rsma support async commit --- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaCommit.c | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7b298ba830..394c314819 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -167,6 +167,7 @@ int32_t smaBegin(SSma* pSma); int32_t smaPreCommit(SSma* pSma); int32_t smaCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); +int32_t smaAsyncCommit(SSma* pSma); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index e241c14fc7..889c4adf24 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -18,6 +18,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaCommitImpl(SSma *pSma); static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); /** * @brief Only applicable to Rollup SMA @@ -43,6 +44,14 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } */ int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } +/** + * @brief Only applicable to Rollup SMA + * + * @param pSma + * @return int32_t + */ +int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } + /** * @brief set rsma trigger stat active * @@ -224,3 +233,15 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { regfree(®ex); return TSDB_CODE_SUCCESS; } + +/** + * @brief Rsma async commit implementation + * 1) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) + * + * @param pSma + * @return int32_t + */ +static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { + // TODO + return TSDB_CODE_SUCCESS; +} From ed34f490ca883533e76f5f02d488a8f4dcdd5122 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 15 Jul 2022 16:06:41 +0800 Subject: [PATCH 3/6] enh: async commit api for rsma --- source/dnode/vnode/src/inc/sma.h | 35 +++++-- source/dnode/vnode/src/inc/vnodeInt.h | 9 +- source/dnode/vnode/src/sma/smaCommit.c | 126 ++++++++++++++++++++--- source/dnode/vnode/src/sma/smaEnv.c | 4 +- source/dnode/vnode/src/sma/smaRollup.c | 52 ++++------ source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- 6 files changed, 168 insertions(+), 62 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index e767d94ebd..cfeb4ebeb2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -67,7 +67,9 @@ struct SRSmaStat { int64_t submitVer; int64_t refId; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks + int8_t commitStat; // 0 not in committing, 1 in committing SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash }; struct SSmaStat { @@ -78,12 +80,29 @@ struct SSmaStat { T_REF_DECLARE() }; -#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) -#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) -#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) -#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) -#define RSMA_REF_ID(r) ((r)->refId) -#define RSMA_SUBMIT_VER(r) ((r)->submitVer) +#define SMA_TSMA_STAT(s) (&(s)->tsmaStat) +#define SMA_RSMA_STAT(s) (&(s)->rsmaStat) +#define RSMA_INFO_HASH(r) ((r)->rsmaInfoHash) +#define RSMA_IMU_INFO_HASH(r) ((r)->iRsmaInfoHash) +#define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) +#define RSMA_COMMIT_STAT(r) (&(r)->commitStat) +#define RSMA_REF_ID(r) ((r)->refId) +#define RSMA_SUBMIT_VER(r) ((r)->submitVer) + +struct SRSmaInfoItem { + void *taskInfo; // qTaskInfo_t + int64_t refId; + tmr_h tmrId; + int32_t maxDelay; + int8_t level; + int8_t triggerStat; +}; + +struct SRSmaInfo { + STSchema *pTSchema; + int64_t suid; + SRSmaInfoItem items[TSDB_RETENTION_L2]; +}; enum { TASK_TRIGGER_STAT_INIT = 0, @@ -94,6 +113,8 @@ enum { TASK_TRIGGER_STAT_DROPPED = 5, }; +#define RSMA_TASK_INFO_HASH_SLOT 8 + void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -183,7 +204,7 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED); } } - +void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 394c314819..3b32ca9c60 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -164,10 +164,11 @@ void smaCleanUp(); int32_t smaOpen(SVnode* pVnode); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); -int32_t smaPreCommit(SSma* pSma); -int32_t smaCommit(SSma* pSma); -int32_t smaPostCommit(SSma* pSma); -int32_t smaAsyncCommit(SSma* pSma); +int32_t smaSyncPreCommit(SSma* pSma); +int32_t smaSyncCommit(SSma* pSma); +int32_t smaSyncPostCommit(SSma* pSma); +int32_t smaAsyncPreCommit(SSma* pSma); +int32_t smaAsyncPostCommit(SSma* pSma); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 889c4adf24..4729187e8c 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -18,7 +18,8 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaCommitImpl(SSma *pSma); static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); /** * @brief Only applicable to Rollup SMA @@ -26,7 +27,7 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); * @param pSma * @return int32_t */ -int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); } +int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); } /** * @brief Only applicable to Rollup SMA @@ -34,7 +35,7 @@ int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); } * @param pSma * @return int32_t */ -int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } +int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } /** * @brief Only applicable to Rollup SMA @@ -42,7 +43,7 @@ int32_t smaCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } * @param pSma * @return int32_t */ -int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } +int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } /** * @brief Only applicable to Rollup SMA @@ -50,7 +51,15 @@ int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } * @param pSma * @return int32_t */ -int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } +int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } + +/** + * @brief Only applicable to Rollup SMA + * + * @param pSma + * @return int32_t + */ +int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } /** * @brief set rsma trigger stat active @@ -71,18 +80,17 @@ int32_t smaBegin(SSma *pSma) { atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED, TASK_TRIGGER_STAT_ACTIVE); switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d rsma trigger stat from paused to active", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma trigger stat from paused to active", SMA_VID(pSma)); break; } case TASK_TRIGGER_STAT_INIT: { atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); - smaDebug("vgId:%d rsma trigger stat from init to active", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma trigger stat from init to active", SMA_VID(pSma)); break; } default: { atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE); - smaWarn("vgId:%d rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat); - ASSERT(0); + smaError("vgId:%d, rsma trigger stat %" PRIi8 " is unexpected", SMA_VID(pSma), rsmaTriggerStat); break; } } @@ -90,7 +98,7 @@ int32_t smaBegin(SSma *pSma) { } /** - * @brief pre-commit for rollup sma. + * @brief pre-commit for rollup sma(sync commit). * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. * 2) wait all triggered fetch tasks finished * 3) perform persist task for qTaskInfo @@ -107,8 +115,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - - // step 1: set persistence task paused + // step 1: set rsma stat paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); // step 2: wait all triggered fetch tasks finished @@ -236,12 +243,101 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { /** * @brief Rsma async commit implementation - * 1) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) + * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED + * 2) Wait all running fetch task finish to fetch and put submitMsg into level 2/3 wQueue(blocking level 1 write) + * 3) * * @param pSma * @return int32_t */ -static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { - // TODO +static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + + // step 1: set rsma stat paused + atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); + + // step 2: wait all triggered fetch tasks finished + int32_t nLoops = 0; + while (1) { + if (T_REF_VAL_GET(pStat) == 0) { + smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + break; + } else { + smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + } + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + + // step 3: swap rsmaInfoHash and iRsmaInfoHash + ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); + ASSERT(RSMA_INFO_HASH(pRSmaStat)); + + RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); + RSMA_INFO_HASH(pRSmaStat) = + taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (!RSMA_INFO_HASH(pRSmaStat)) { + smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty. + * + * @param pSma + * @return int32_t + */ +static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + + // step 1: merge rsmaInfoHash and iRsmaInfoHash + + if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { + // TODO: + } + + void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); + while (pIter) { + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + + if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { + taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); + smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); + } else { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = &pRSmaInfo->items[i]; + if (pItem->taskInfo) { + tdFreeQTaskInfo(pItem->taskInfo, SMA_VID(pSma), i + 1); + } + } + smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), + *pSuid); + } + + pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + } + + taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); + RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 4a7d4db874..77377cf089 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -17,7 +17,6 @@ typedef struct SSmaStat SSmaStat; -#define RSMA_TASK_INFO_HASH_SLOT 8 #define SMA_MGMT_REF_NUM 10240 extern SSmaMgmt smaMgmt; @@ -311,7 +310,6 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma), RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr()); - ASSERT(0); } else { smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma), RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId); @@ -361,7 +359,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { } break; default: - smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType); + smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index a6d56acbfb..300a0ec4be 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -48,20 +48,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed); static int32_t tdRSmaRestoreTSDataReload(SSma *pSma, int64_t committed); -struct SRSmaInfoItem { - void *taskInfo; // qTaskInfo_t - int64_t refId; - tmr_h tmrId; - int32_t maxDelay; - int8_t level; - int8_t triggerStat; -}; -struct SRSmaInfo { - STSchema *pTSchema; - int64_t suid; - SRSmaInfoItem items[TSDB_RETENTION_L2]; -}; static SRSmaInfo *tdGetRSmaInfoByItem(SRSmaInfoItem *pItem) { // adapt accordingly if definition of SRSmaInfo update @@ -102,7 +89,7 @@ static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } -static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { +void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { @@ -123,7 +110,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { i + 1); taosTmrStopA(&pItem->tmrId); } - tdFreeTaskHandle(&pItem->taskInfo, SMA_VID(pSma), i + 1); + tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1); } else { smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), pInfo->suid, i + 1); @@ -257,22 +244,21 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx) { - SRetention *pRetention = SMA_RETENTION(pSma); - STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + if ((param->qmsgLen > 0) && param->qmsg[idx]) { + SRetention *pRetention = SMA_RETENTION(pSma); + STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + SReadHandle handle = { + .meta = pSma->pVnode->pMeta, + .vnode = pSma->pVnode, + .initTqReader = 1, + }; - SReadHandle handle = { - .meta = pSma->pVnode->pMeta, - .vnode = pSma->pVnode, - .initTqReader = 1, - }; - - if (param->qmsg[idx]) { SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->refId = RSMA_REF_ID(pStat); pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); if (!pItem->taskInfo) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; - goto _err; + return TSDB_CODE_FAILED; } pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { @@ -286,13 +272,11 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; - smaInfo("vgId:%d table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 + smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); } return TSDB_CODE_SUCCESS; -_err: - return TSDB_CODE_FAILED; } /** @@ -562,7 +546,9 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche SSDataBlock *output = NULL; uint64_t ts; if (qExecTask(pItem->taskInfo, &output, &ts) < 0) { - ASSERT(false); + smaError("vgId:%d, qExecTask for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma), suid, + pItem->level, terrstr()); + goto _err; } if (!output) { break; @@ -572,7 +558,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche pResult = taosArrayInit(1, sizeof(SSDataBlock)); if (!pResult) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + goto _err; } } @@ -891,7 +877,7 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; _err: - smaError("vgId:%d failed to restore rsma task since %s", SMA_VID(pSma), terrstr()); + smaError("vgId:%d, failed to restore rsma task since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } @@ -1217,6 +1203,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); + if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { + taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, + &pItem->tmrId); + } return; } default: diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index ebbb691e28..4e55c05a47 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -233,7 +233,7 @@ int vnodeCommit(SVnode *pVnode) { walBeginSnapshot(pVnode->pWal, pVnode->state.applied); // preCommit - smaPreCommit(pVnode->pSma); + smaSyncPreCommit(pVnode->pSma); // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { @@ -276,7 +276,7 @@ int vnodeCommit(SVnode *pVnode) { pVnode->state.committed = info.state.committed; // postCommit - smaPostCommit(pVnode->pSma); + smaSyncPostCommit(pVnode->pSma); // apply the commit (TODO) walEndSnapshot(pVnode->pWal); From 16750e1401065edf70acd392af4f8aca5c64cd29 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 16 Jul 2022 17:12:45 +0800 Subject: [PATCH 4/6] enh: rsma async commit support --- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/dnode/vnode/src/inc/sma.h | 61 +++++------ source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaCommit.c | 125 ++++++++++++++++------- source/dnode/vnode/src/sma/smaEnv.c | 12 +-- source/dnode/vnode/src/sma/smaRollup.c | 103 ++++++++++++++----- source/dnode/vnode/src/sma/smaUtil.c | 95 +++++++++++++++++ source/dnode/vnode/src/vnd/vnodeCommit.c | 8 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 9 files changed, 299 insertions(+), 110 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 88629ebc69..bff3f19e99 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -709,7 +709,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - mError("sma:%s, failed to create since %s", createReq.name, terrstr(terrno)); + mError("sma:%s, failed to create since %s", createReq.name, terrstr()); } mndReleaseStb(pMnode, pStb); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index cfeb4ebeb2..1c8434594d 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -32,6 +32,8 @@ extern "C" { #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +#define RSMA_TASK_INFO_HASH_SLOT 8 + typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; typedef struct STSmaStat STSmaStat; @@ -41,7 +43,7 @@ typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfoItem SRSmaInfoItem; struct SSmaEnv { - TdThreadRwlock lock; + SRWLatch lock; int8_t type; SSmaStat *pStat; }; @@ -52,7 +54,7 @@ typedef struct { void *tmrHandle; // shared by all fetch tasks } SSmaMgmt; -#define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_LOCK(env) (&(env)->lock) #define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_STAT(env) ((env)->pStat) @@ -64,12 +66,14 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; - int64_t submitVer; - int64_t refId; // shared by fetch tasks - int8_t triggerStat; // shared by fetch tasks - int8_t commitStat; // 0 not in committing, 1 in committing - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; - SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash + int64_t commitAppliedVer; // vnode applied version for async commit + int64_t commitSubmitVer; // rsma submit version for async commit + int64_t submitVer; // latest submit version + int64_t refId; // shared by fetch tasks + int8_t triggerStat; // shared by fetch tasks + int8_t commitStat; // 0 not in committing, 1 in committing + SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash }; struct SSmaStat { @@ -113,7 +117,13 @@ enum { TASK_TRIGGER_STAT_DROPPED = 5, }; -#define RSMA_TASK_INFO_HASH_SLOT 8 +enum { + RSMA_ROLE_CREATE = 0, + RSMA_ROLE_DROP = 1, + RSMA_ROLE_FETCH = 2, + RSMA_ROLE_SUBMIT = 3, + RSMA_ROLE_ITERATE = 4, +}; void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -133,33 +143,6 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); int32_t tdLockSma(SSma *pSma); int32_t tdUnLockSma(SSma *pSma); -static FORCE_INLINE int32_t tdRLockSmaEnv(SSmaEnv *pEnv) { - int code = taosThreadRwlockRdlock(&(pEnv->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} - -static FORCE_INLINE int32_t tdWLockSmaEnv(SSmaEnv *pEnv) { - int code = taosThreadRwlockWrlock(&(pEnv->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} - -static FORCE_INLINE int32_t tdUnLockSmaEnv(SSmaEnv *pEnv) { - int code = taosThreadRwlockUnlock(&(pEnv->lock)); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - return -1; - } - return 0; -} - static FORCE_INLINE int8_t tdSmaStat(STSmaStat *pTStat) { if (pTStat) { return atomic_load_8(&pTStat->state); @@ -204,11 +187,13 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { atomic_or_fetch_8(&pTStat->state, TSDB_SMA_STAT_DROPPED); } } + +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); -void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat); +void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdProcessRSmaRestoreImpl(SSma *pSma); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3b32ca9c60..9186b4794a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -168,6 +168,7 @@ int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma); +int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 4729187e8c..10f750f58d 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -15,11 +15,13 @@ #include "sma.h" -static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); +static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); +static int32_t tdClearupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); /** * @brief Only applicable to Rollup SMA @@ -27,7 +29,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); * @param pSma * @return int32_t */ -int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); } +int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); } /** * @brief Only applicable to Rollup SMA @@ -35,7 +37,7 @@ int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaPreCommitImpl(pSma); * @param pSma * @return int32_t */ -int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } +int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); } /** * @brief Only applicable to Rollup SMA @@ -43,7 +45,7 @@ int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaCommitImpl(pSma); } * @param pSma * @return int32_t */ -int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma); } +int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); } /** * @brief Only applicable to Rollup SMA @@ -53,6 +55,14 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaPostCommitImpl(pSma) */ int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +/** + * @brief Only applicable to Rollup SMA + * + * @param pSma + * @return int32_t + */ +int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } + /** * @brief Only applicable to Rollup SMA * @@ -106,7 +116,7 @@ int32_t smaBegin(SSma *pSma) { * @param pSma * @return int32_t */ -static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { +static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); if (!pSmaEnv) { return TSDB_CODE_SUCCESS; @@ -135,7 +145,9 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { } // step 3: perform persist task for qTaskInfo - tdRSmaPersistExecImpl(pRSmaStat); + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + pRSmaStat->commitSubmitVer = pRSmaStat->submitVer; + tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma)); @@ -148,7 +160,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { * @param pSma * @return int32_t */ -static int32_t tdProcessRSmaCommitImpl(SSma *pSma) { +static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); if (!pSmaEnv) { return TSDB_CODE_SUCCESS; @@ -156,21 +168,9 @@ static int32_t tdProcessRSmaCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } -/** - * @brief post-commit for rollup sma - * 1) clean up the outdated qtaskinfo files - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { - SVnode *pVnode = pSma->pVnode; - - if (!VND_IS_RSMA(pVnode)) { - return TSDB_CODE_SUCCESS; - } - - int64_t committed = pVnode->state.committed; +static int32_t tdClearupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { + SVnode *pVnode = pSma->pVnode; + int64_t committed = pRSmaStat->commitAppliedVer; TdDirPtr pDir = NULL; TdDirEntryPtr pDirEntry = NULL; char dir[TSDB_FILENAME_LEN]; @@ -238,6 +238,29 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { taosCloseDir(&pDir); regfree(®ex); + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief post-commit for rollup sma + * 1) clean up the outdated qtaskinfo files + * + * @param pSma + * @return int32_t + */ +static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { + SVnode *pVnode = pSma->pVnode; + if (!VND_IS_RSMA(pVnode)) { + return TSDB_CODE_SUCCESS; + } + + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv)); + + // cleanup outdated qtaskinfo files + tdClearupQTaskInfoFiles(pSma, pRSmaStat); + return TSDB_CODE_SUCCESS; } @@ -259,8 +282,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - // step 1: set rsma stat paused + // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); // step 2: wait all triggered fetch tasks finished int32_t nLoops = 0; @@ -285,19 +309,45 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); RSMA_INFO_HASH(pRSmaStat) = taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); + if (!RSMA_INFO_HASH(pRSmaStat)) { smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } + // step 4: others + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + pRSmaStat->commitSubmitVer = pRSmaStat->submitVer; + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief commit for rollup sma + * + * @param pSma + * @return int32_t + */ +static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); + + // perform persist task for qTaskInfo + tdRSmaPersistExecImpl(pRSmaStat, RSMA_IMU_INFO_HASH(pRSmaStat)); + return TSDB_CODE_SUCCESS; } /** * @brief Migrate rsmaInfo from iRsmaInfo to rsmaInfo if rsmaInfoHash not empty. - * - * @param pSma - * @return int32_t + * + * @param pSma + * @return int32_t */ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); @@ -309,9 +359,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); // step 1: merge rsmaInfoHash and iRsmaInfoHash + taosWLockLatch(SMA_ENV_LOCK(pSmaEnv)); if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { - // TODO: + // TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty } void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); @@ -320,15 +371,12 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); - smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); + smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), + *pSuid); } else { + // free the resources SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - SRSmaInfoItem *pItem = &pRSmaInfo->items[i]; - if (pItem->taskInfo) { - tdFreeQTaskInfo(pItem->taskInfo, SMA_VID(pSma), i + 1); - } - } + tdFreeRSmaInfo(pSma, pRSmaInfo, false); smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), *pSuid); } @@ -339,5 +387,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; + taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv)); + + // step 2: cleanup outdated qtaskinfo files + tdClearupQTaskInfoFiles(pSma, pRSmaStat); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 77377cf089..577d5fd4fa 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -108,12 +108,7 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) SMA_ENV_TYPE(pEnv) = smaType; - int code = taosThreadRwlockInit(&(pEnv->lock), NULL); - if (code) { - terrno = TAOS_SYSTEM_ERROR(code); - taosMemoryFree(pEnv); - return NULL; - } + taosInitRWLatch(&(pEnv->lock)); if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); @@ -147,7 +142,6 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEn void tdDestroySmaEnv(SSmaEnv *pSmaEnv) { if (pSmaEnv) { pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv)); - taosThreadRwlockDestroy(&(pSmaEnv->lock)); } } @@ -259,7 +253,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { void *infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), NULL); while (infoHash) { SRSmaInfo *pSmaInfo = *(SRSmaInfo **)infoHash; - tdFreeRSmaInfo(pSma, pSmaInfo); + tdFreeRSmaInfo(pSma, pSmaInfo, true); infoHash = taosHashIterate(RSMA_INFO_HASH(pStat), infoHash); } } @@ -359,7 +353,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { } break; default: - smaError("vgId:%d, undefined smaType:%", SMA_VID(pSma), smaType); + smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 300a0ec4be..32d6dee57f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -91,6 +91,7 @@ static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC + if (!taskHandle) return; qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); @@ -98,14 +99,23 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { } else { smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); } + // TODO: clear files related to qTaskInfo? } -void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { +/** + * @brief general function to free rsmaInfo + * + * @param pSma + * @param pInfo + * @param isDeepFree Only stop tmrId and free pTSchema for deep free + * @return void* + */ +void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (pInfo) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; if (pItem->taskInfo) { - if (pItem->tmrId) { + if (isDeepFree && pItem->tmrId) { smaDebug("vgId:%d, table %" PRIi64 " stop fetch timer %p level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, i + 1); taosTmrStopA(&pItem->tmrId); @@ -116,7 +126,9 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { pInfo->suid, i + 1); } } - taosMemoryFree(pInfo->pTSchema); + if (isDeepFree) { + taosMemoryFree(pInfo->pTSchema); + } taosMemoryFree(pInfo); } @@ -138,7 +150,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) if (!suid || !tbUids) { terrno = TSDB_CODE_INVALID_PTR; - smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); + smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr()); return TSDB_CODE_FAILED; } @@ -152,7 +164,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) if (pRSmaInfo->items[0].taskInfo) { if ((qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) { - smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); + smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr()); return TSDB_CODE_FAILED; } else { smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), @@ -162,7 +174,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) if (pRSmaInfo->items[1].taskInfo) { if ((qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) { - smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr(terrno)); + smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr()); return TSDB_CODE_FAILED; } else { smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), @@ -247,9 +259,10 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat if ((param->qmsgLen > 0) && param->qmsg[idx]) { SRetention *pRetention = SMA_RETENTION(pSma); STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + SVnode *pVnode = pSma->pVnode; SReadHandle handle = { - .meta = pSma->pVnode->pMeta, - .vnode = pSma->pVnode, + .meta = pVnode->pMeta, + .vnode = pVnode, .initTqReader = 1, }; @@ -274,7 +287,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; smaInfo("vgId:%d, table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, - SMA_VID(pSma), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); + TD_VID(pVnode), pRSmaInfo->suid, idx + 1, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); } return TSDB_CODE_SUCCESS; } @@ -341,7 +354,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_SUCCESS; _err: - tdFreeRSmaInfo(pSma, pRSmaInfo); + tdFreeRSmaInfo(pSma, pRSmaInfo, true); return TSDB_CODE_FAILED; } @@ -635,9 +648,18 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType return TSDB_CODE_SUCCESS; } +/** + * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. + * + * @param pSma + * @param suid + * @return SRSmaInfo* + */ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = NULL; + SRSmaInfo *pRSmaInfo = NULL; + if (!pEnv) { // only applicable when rsma env exists return NULL; @@ -648,11 +670,37 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - SRSmaInfo *pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); + if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + return pRSmaInfo; + } + + if (RSMA_COMMIT_STAT(pStat) == 0) { return NULL; } - return pRSmaInfo; + + // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat + SRSmaInfo *pCowRSmaInfo = NULL; + // lock + taosWLockLatch(SMA_ENV_LOCK(pEnv)); + void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); + if (iRSmaInfo) { + SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; + if (pIRSmaInfo) { + if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) { + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); + return NULL; + } + if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + return NULL; + } + } + } + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + return pCowRSmaInfo; } static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { @@ -881,6 +929,13 @@ _err: return TSDB_CODE_FAILED; } +/** + * @brief Restore from SRSmaQTaskInfoItem + * + * @param pSma + * @param pItem + * @return int32_t + */ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) { SRSmaInfo *pRSmaInfo = NULL; void *qTaskInfo = NULL; @@ -906,7 +961,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem * if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, - pItem->type, terrstr(terrno)); + pItem->type, terrstr()); return TSDB_CODE_FAILED; } smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, @@ -1053,26 +1108,27 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { return TSDB_CODE_SUCCESS; } -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; int32_t vid = SMA_VID(pSma); int64_t toffset = 0; bool isFileCreated = false; - if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { + if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; } - void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); + void *infoHash = taosHashIterate(pInfoHash, NULL); if (!infoHash) { return TSDB_CODE_SUCCESS; } STFile tFile = {0}; - if (RSMA_SUBMIT_VER(pRSmaStat) > 0) { +#if 0 + if (pRSmaStat->commitAppliedVer > 0) { char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName); + tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; @@ -1085,6 +1141,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { isFileCreated = true; } +#endif while (infoHash) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; @@ -1100,7 +1157,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { int8_t type = (int8_t)(i + 1); if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { smaError("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo failed since %s", vid, pRSmaInfo->suid, - i + 1, terrstr(terrno)); + i + 1, terrstr()); goto _err; } if (!pOutput || len <= 0) { @@ -1116,7 +1173,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { if (!isFileCreated) { char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName); + tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; @@ -1149,11 +1206,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { taosMemoryFree(pOutput); } - infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash); + infoHash = taosHashIterate(pInfoHash, infoHash); } if (isFileCreated) { - tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->submitVer); + tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->commitSubmitVer); if (tdUpdateTFileHeader(&tFile) < 0) { smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), tstrerror(terrno)); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 22d82e8df0..13befabed5 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -313,4 +313,99 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t return TSDB_CODE_SUCCESS; } + +static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param, + tb_uid_t suid, int8_t idx) { + SVnode *pVnode = pSma->pVnode; + char *pOutput = NULL; + int32_t len = 0; + + if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) { + smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + + SReadHandle handle = { + .meta = pVnode->pMeta, + .vnode = pVnode, + .initTqReader = 1, + }; + ASSERT(!dstTaskInfo); + dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); + if (!dstTaskInfo) { + terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; + goto _err; + } + + if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) { + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, + terrstr()); + goto _err; + } + + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); + + taosMemoryFreeClear(pOutput); + return TSDB_CODE_SUCCESS; +_err: + taosMemoryFreeClear(pOutput); + tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1); + return TSDB_CODE_FAILED; +} + +/** + * @brief pTSchema is shared + * + * @param pSma + * @param pDest + * @param pSrc + * @return int32_t + */ +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) { + SVnode *pVnode = pSma->pVnode; + SRSmaParam *param = NULL; + if (!pSrc) { + return TSDB_CODE_SUCCESS; + } + + if (!pDest) { + pDest = taosMemoryCalloc(1, sizeof(SRSmaInfo)); + if (!pDest) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } + + memcpy(pDest, pSrc, sizeof(SRSmaInfo)); + + SMetaReader mr = {0}; + metaReaderInit(&mr, SMA_META(pSma), 0); + smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid); + if (metaGetTableEntryByUid(&mr, pSrc->suid) < 0) { + smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, + terrstr()); + goto _err; + } + ASSERT(mr.me.type == TSDB_SUPER_TABLE); + ASSERT(mr.me.uid == pSrc->suid); + if (TABLE_IS_ROLLUP(mr.me.flags)) { + param = &mr.me.stbEntry.rsmaParam; + for (int i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = &pSrc->items[i]; + if (pItem->taskInfo) { + tdCloneQTaskInfo(pSma, pDest->items[i].taskInfo, pItem->taskInfo, param, pSrc->suid, i); + } + } + smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); + } + + metaReaderClear(&mr); + return TSDB_CODE_SUCCESS; +_err: + metaReaderClear(&mr); + tdFreeRSmaInfo(pSma, pDest, false); + return TSDB_CODE_FAILED; +} + // ... \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4e55c05a47..c969861241 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -233,7 +233,8 @@ int vnodeCommit(SVnode *pVnode) { walBeginSnapshot(pVnode->pWal, pVnode->state.applied); // preCommit - smaSyncPreCommit(pVnode->pSma); + // smaSyncPreCommit(pVnode->pSma); + smaAsyncPreCommit(pVnode->pSma); // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { @@ -242,6 +243,8 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { + smaAsyncCommit(pVnode->pSma); + if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); return -1; @@ -276,7 +279,8 @@ int vnodeCommit(SVnode *pVnode) { pVnode->state.committed = info.state.committed; // postCommit - smaSyncPostCommit(pVnode->pSma); + // smaSyncPostCommit(pVnode->pSma); + smaAsyncPostCommit(pVnode->pSma); // apply the commit (TODO) walEndSnapshot(pVnode->pWal); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index cca212a4e4..3761516bda 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -900,7 +900,7 @@ static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void * _err: tDecoderClear(&coder); vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s", - TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno)); + TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr()); return -1; } From 0144ef524f0f049847842b45f34a167072270466 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 16 Jul 2022 17:16:57 +0800 Subject: [PATCH 5/6] other: api rename --- source/dnode/vnode/src/sma/smaCommit.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 10f750f58d..cdaaf2bbdb 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -21,7 +21,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); -static int32_t tdClearupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); +static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); /** * @brief Only applicable to Rollup SMA @@ -168,7 +168,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { return TSDB_CODE_SUCCESS; } -static int32_t tdClearupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { +static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { SVnode *pVnode = pSma->pVnode; int64_t committed = pRSmaStat->commitAppliedVer; TdDirPtr pDir = NULL; @@ -259,7 +259,7 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(SMA_ENV_STAT(pSmaEnv)); // cleanup outdated qtaskinfo files - tdClearupQTaskInfoFiles(pSma, pRSmaStat); + tdCleanupQTaskInfoFiles(pSma, pRSmaStat); return TSDB_CODE_SUCCESS; } @@ -390,7 +390,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv)); // step 2: cleanup outdated qtaskinfo files - tdClearupQTaskInfoFiles(pSma, pRSmaStat); + tdCleanupQTaskInfoFiles(pSma, pRSmaStat); return TSDB_CODE_SUCCESS; } From ba87537a3e74d9557dd3821131fd29ec8830a020 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 16 Jul 2022 17:45:23 +0800 Subject: [PATCH 6/6] feat: update taostools for3.0 (#14998) * feat: update taos-tools for 3.0 [TD-14141] * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 --- tools/taos-tools | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/taos-tools b/tools/taos-tools index bd496f76b6..b7b922268c 160000 --- a/tools/taos-tools +++ b/tools/taos-tools @@ -1 +1 @@ -Subproject commit bd496f76b64931c66da2f8b0f24143a98a881cde +Subproject commit b7b922268c4a06d9db77ffdfde0726f3d9900b72