From 5c5e242b7e84a3bac7862645a9a5fac384fe1daf Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Sat, 18 Jun 2022 17:08:57 +0800 Subject: [PATCH 1/6] test: add test case for tmq --- tests/pytest/util/common.py | 129 ++++++++++++++++++++++- tests/system-test/7-tmq/subscribeDb4.py | 116 ++++++++++++++++++++ tests/system-test/7-tmq/subscribeStb4.py | 22 ++-- tests/system-test/7-tmq/tmqCommon.py | 118 +++++++++++++++++++++ 4 files changed, 371 insertions(+), 14 deletions(-) create mode 100644 tests/system-test/7-tmq/subscribeDb4.py create mode 100644 tests/system-test/7-tmq/tmqCommon.py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 7b00e6f331..e158f8edd2 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -11,13 +11,20 @@ # -*- coding: utf-8 -*- +from collections import defaultdict import random import string -from util.sql import tdSql -from util.dnodes import tdDnodes import requests import time import socket + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + class TDCom: def init(self, conn, logSql): tdSql.init(conn.cursor(), logSql) @@ -170,4 +177,122 @@ class TDCom: def close(self): self.cursor.close() + def create_database(self,tsql, dbName='test',dropFlag=1,precision="ms", **kwargs): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + ''' + vgroups replica precision strict wal fsync comp cachelast single_stable buffer pagesize pages minrows maxrows duration keep retentions + ''' + sqlString = f'create database if not exists {dbName} precision "{precision}" vgroups 4' + if len(kwargs) > 0: + dbParams = "" + for param, value in kwargs.items(): + dbParams += f'{param} {value} ' + sqlString += f'{dbParams}' + + tsql.execute(sqlString) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName,columnDict,tagDict): + colSchema = '' + for i in range(columnDict['int']): + colSchema += ', c%d int'%i + tagSchema = '' + for i in range(tagDict['int']): + if i > 0: + tagSchema += ',' + tagSchema += 't%d int'%i + + tsql.execute("create table if not exists %s.%s (ts timestamp %s) tags(%s)"%(dbName, stbName, colSchema, tagSchema)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum,tagDict): + tsql.execute("use %s" %dbName) + tagsValues = '' + for i in range(tagDict['int']): + if i > 0: + tagsValues += ',' + tagsValues += '%d'%i + + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%s)"%(stbName,i,stbName,tagsValues) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, %d)"%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def getClientCfgPath(self): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + return cfgPath + + def newcur(self,host='localhost',port=6030,user='root',password='taosdata'): + cfgPath = self.getClientCfgPath() + con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port) + cur=con.cursor() + print(cur) + return cur + + def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata'): + newTdSql = TDSql() + cur = self.newcur(host=host,port=port,user=user,password=password) + newTdSql.init(cur, False) + return newTdSql + tdCom = TDCom() diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py new file mode 100644 index 0000000000..de475803fe --- /dev/null +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -0,0 +1,116 @@ +import sys +import time +import socket +import os +import threading + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + paraDict = {'dbName': 'db12', + 'dropFlag':1, + 'vgroups': 4, + 'precision': 'ms', + 'stbName': 'stb0', + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 0, # 1640966400000 ----> 2022-01-01 00:00:00.000 + 'event':'', + 'columnDict': {'int':2}, + 'tagDict': {'int':1} + } + + cdbName = 'cdb' + # some parameter to consumer processor + consumerId = 0 + expectrowcnt = 0 + topicList = '' + ifcheckdata = 0 + ifManualCommit = 1 + groupId = 'group.id:cgrp1' + autoCommit = 'enable.auto.commit:false' + autoCommitInterval = 'auto.commit.interval.ms:1000' + autoOffset = 'auto.offset.reset:earliest' + + pollDelay = 20 + showMsg = 1 + showRow = 1 + + hostname = socket.gethostname() + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + logSql = False + tdSql.init(conn.cursor(), logSql) + + def tmqCase12(self): + tdLog.printNoPrefix("======== test case 12: ") + tdLog.info("step 1: create database, stb, ctb and insert data") + + tmqCom.initConsumerTable(self.cdbName) + + tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) + + self.paraDict["stbName"] = 'stb1' + tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) + tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) + tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + + self.paraDict["stbName"] = 'stb2' + tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) + tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) + tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + + tdLog.info("create topics from db") + topicName1 = 'topic_%s'%(self.paraDict['dbName']) + tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName'])) + + topicList = topicName1 + keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset) + self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2 + tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) + + tdLog.info("After waiting for a period of time, drop one stable") + time.sleep(10) + tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) + + tdLog.info("wait result from consumer, then check it") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt): + tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + time.sleep(15) + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 12 end ...... ") + + def run(self): + tdSql.prepare() + self.tmqCase12() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeStb4.py b/tests/system-test/7-tmq/subscribeStb4.py index d8a9ca95b0..f3982c8f1f 100644 --- a/tests/system-test/7-tmq/subscribeStb4.py +++ b/tests/system-test/7-tmq/subscribeStb4.py @@ -196,16 +196,16 @@ class TDTestCase: auotCtbPrefix = 'autoCtb' # create and start thread - parameterDict = {'cfg': '', \ - 'actionType': 0, \ - 'dbName': 'db1', \ - 'dropFlag': 1, \ - 'vgroups': 4, \ - 'replica': 1, \ - 'stbName': 'stb1', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ - 'batchNum': 100, \ + parameterDict = {'cfg': '', + 'actionType': 0, + 'dbName': 'db1', + 'dropFlag': 1, + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb1', + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 100, 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -349,7 +349,5 @@ class TDTestCase: tdSql.close() tdLog.success(f"{__file__} successfully executed") -event = threading.Event() - tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py new file mode 100644 index 0000000000..68adaa4126 --- /dev/null +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -0,0 +1,118 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from collections import defaultdict +import random +import string +import threading +import requests +import time +# import socketfrom + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +# class actionType(Enum): +# CREATE_DATABASE = 0 +# CREATE_STABLE = 1 +# CREATE_CTABLE = 2 +# INSERT_DATA = 3 + +class TMQCom: + def init(self, conn, logSql): + tdSql.init(conn.cursor()) + # tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def syncCreateDbStbCtbInsertData(self, tsql, paraDict): + tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"], paraDict['precision']) + tdCom.create_stable(tsql, paraDict["dbName"],paraDict["stbName"], paraDict["columnDict"], paraDict["tagDict"]) + tdCom.create_ctables(tsql, paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["tagDict"]) + if "event" in paraDict and type(paraDict['event']) == type(threading.Event()): + paraDict["event"].set() + tdCom.insert_data(tsql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + return + + def threadFunction(self, **paraDict): + # create new connector for new tdSql instance in my thread + newTdSql = tdCom.newTdSql() + self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict) + return + + def asyncCreateDbStbCtbInsertData(self, paraDict): + pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict) + pThread.start() + return pThread + + def close(self): + self.cursor.close() + +tmqCom = TMQCom() From 00aa5abc92ce26a0be57e83f623a454141065de1 Mon Sep 17 00:00:00 2001 From: jiacy-jcy Date: Mon, 20 Jun 2022 09:31:58 +0800 Subject: [PATCH 2/6] update today.py --- tests/system-test/2-query/Today.py | 519 ++++++++--------------------- 1 file changed, 144 insertions(+), 375 deletions(-) diff --git a/tests/system-test/2-query/Today.py b/tests/system-test/2-query/Today.py index 9eb06de9fb..a9c3215a12 100644 --- a/tests/system-test/2-query/Today.py +++ b/tests/system-test/2-query/Today.py @@ -4,394 +4,163 @@ from util.dnodes import * from util.log import * from util.sql import * from util.cases import * +import datetime class TDTestCase: - updatecfgDict = {'rpcDebugFlag': '143'} def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) + self.today_date = datetime.datetime.strptime( + datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + self.time_unit = ['b','u','a','s','m','h','d','w'] + self.error_param = ['1.5','abc','!@#','"abc"','today()'] + self.arithmetic_operators = ['+','-','*','/'] + self.relational_operator = ['<','<=','=','>=','>'] + # prepare data + self.ntbname = 'ntb' + self.stbname = 'stb' + self.column_dict = { + 'ts':'timestamp', + 'c1':'int', + 'c2':'float', + 'c3':'double', + 'c4':'timestamp' + } + self.tag_dict = { + 't0':'int' + } + self.tbnum = 2 + self.tag_values = [ + f'10', + f'100' + ] + self.values_list = [f'now,1,1.55,100.555555,today()', + f'now+1d,10,11.11,99.999999,now()', + f'today(),3,3.333,333.333333,now()', + f'today()-1d,10,11.11,99.999999,now()', + f'today()+1d,1,1.55,100.555555,today()'] + + def set_create_normaltable_sql(self, ntbname, column_dict): + column_sql = '' + for k, v in column_dict.items(): + column_sql += f"{k} {v}," + create_ntb_sql = f'create table {ntbname} ({column_sql[:-1]})' + return create_ntb_sql + def set_create_stable_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}," + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}," + create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})' + return create_stb_sql + def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'): + for k,v in column_dict.items(): + num_up = 0 + num_down = 0 + num_same = 0 + if v.lower() == 'timestamp': + tdSql.query(f'select {k} from {tbname}') + for i in tdSql.queryResult: + if i[0] > self.today_date: + num_up += 1 + elif i[0] == self.today_date: + num_same += 1 + elif i[0] < self.today_date: + num_down += 1 + tdSql.query(f"select today() from {tbname}") + tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkData(0, 0, str(self.today_date)) + tdSql.query(f"select * from {tbname} where {k}=today()") + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same) + for i in [f'{tbname}',f'db.{tbname}']: + for unit in self.time_unit: + for symbol in ['+','-']: + tdSql.query(f"select today() {symbol}1{unit} from {i}") + tdSql.checkRows(len(values_list)*tb_num) + for unit in self.error_param: + for symbol in self.arithmetic_operators: + tdSql.error(f'select today() {symbol}{unit} from {i}') + for symbol in self.arithmetic_operators: + tdSql.query(f'select now(){symbol}null from {i}') + tdSql.checkData(0,0,None) + for symbol in self.relational_operator: + tdSql.query(f'select * from {i} where {k} {symbol} today()') + if symbol == '<' : + if tb == 'tb': + tdSql.checkRows(num_down*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_down) + elif symbol == '<=': + if tb == 'tb': + tdSql.checkRows((num_same+num_down)*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same+num_down) + elif symbol == '=': + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same) + elif symbol == '>=': + if tb == 'tb': + tdSql.checkRows((num_up + num_same)*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_up + num_same) + elif symbol == '>': + if tb == 'tb': + tdSql.checkRows(num_up*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_up) + tdSql.query(f"select today()/0 from {tbname}") + tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkData(0,0,None) + tdSql.query(f"select {k} from {tbname} where {k}=today()") + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + for i in range(num_same*tb_num): + tdSql.checkData(i, 0, str(self.today_date)) + elif tb == 'stb': + tdSql.checkRows(num_same) + for i in range(num_same): + tdSql.checkData(i, 0, str(self.today_date)) + def today_check_ntb(self): + tdSql.prepare() + tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict)) + for i in self.values_list: + tdSql.execute( + f'insert into {self.ntbname} values({i})') + self.data_check(self.column_dict,self.ntbname,self.values_list) + tdSql.execute('drop database db') + def today_check_stb_tb(self): + tdSql.prepare() + tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values ({j})') + # check child table + for i in range(self.tbnum): + self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list) + # check stable + self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb') + tdSql.execute('drop database db') def run(self): # sourcery skip: extract-duplicate-method - # for func now() , today(), timezone() - tdSql.prepare() - today_date = datetime.datetime.strptime( - datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + + tdLog.printNoPrefix("==========check today() for normal table ==========") + self.today_check_ntb() + tdLog.printNoPrefix("==========check today() for stable and child table==========") + self.today_check_stb_tb() - tdLog.printNoPrefix("==========step1:create tables==========") - tdSql.execute( - '''create table if not exists ntb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) - ''' - ) - tdSql.execute( - '''create table if not exists stb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int) - ''' - ) - tdSql.execute( - '''create table if not exists stb_1 using stb tags(100) - ''' - ) - tdLog.printNoPrefix("==========step2:insert data into ntb==========") - tdSql.execute( - 'insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') - tdSql.execute( - 'insert into stb_1 values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') - tdLog.printNoPrefix("==========step2:query test of ntb ==========") - - # test function today() - # normal table - tdSql.query("select today() from ntb") - tdSql.checkRows(3) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() +1w from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1w from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1d from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1d from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1h from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1h from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1m from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1m from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1s from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1s from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1a from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1a from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1u from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1u from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1b from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1b from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1w from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1w from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1d from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1d from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1h from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1h from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1m from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1m from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1s from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1s from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1a from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1a from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1u from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1u from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1b from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1b from db.ntb") - tdSql.checkRows(3) - tdSql.query("select * from ntb where ts=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 1, 3) - tdSql.query("select * from ntb where ts<=today()") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 10) - # for bug - # tdSql.query("select * from ntb where ts=today()") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 3) - # tdSql.query("select * from ntb where ts>today()") - # tdSql.checkRows(1) - tdSql.query("select c4 from ntb where c4=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from ntb where ts=today()") - # tdSql.checkRows(2) - # tdSql.query("select * from ntb where ts>today()") - # tdSql.checkRows(1) - tdSql.query("select c4 from stb where c4=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from stb where ts=today()") - tdSql.checkRows(2) - tdSql.query("select * from ntb where ts>today()") - tdSql.checkRows(1) - tdSql.query("select c4 from stb_1 where c4=today()") - tdSql.checkRows(1) - tdSql.query("select today() from stb_1 where ts Date: Mon, 20 Jun 2022 10:58:59 +0800 Subject: [PATCH 3/6] refactor(sync): get snapshot and create reader --- include/libs/sync/sync.h | 7 +++--- source/dnode/mnode/impl/src/mndSync.c | 13 +++++++++- source/dnode/vnode/src/vnd/vnodeSync.c | 6 ++--- source/libs/sync/inc/syncInt.h | 3 +++ source/libs/sync/src/syncAppendEntries.c | 4 ++-- source/libs/sync/src/syncAppendEntriesReply.c | 6 ++--- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncMain.c | 24 +++++++++---------- source/libs/sync/src/syncReplication.c | 2 +- source/libs/sync/src/syncSnapshot.c | 6 ++--- .../test/syncConfigChangeSnapshotTest.cpp | 6 ++--- .../libs/sync/test/syncConfigChangeTest.cpp | 6 ++--- source/libs/sync/test/syncRaftLogTest2.cpp | 2 +- source/libs/sync/test/syncRaftLogTest3.cpp | 12 +++++----- source/libs/sync/test/syncReplicateTest.cpp | 6 ++--- .../libs/sync/test/syncSnapshotSenderTest.cpp | 2 +- source/libs/sync/test/syncSnapshotTest.cpp | 6 ++--- source/libs/sync/test/syncTestTool.cpp | 2 +- 18 files changed, 65 insertions(+), 50 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e8c4faa240..dd9fd384ce 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -107,7 +107,9 @@ typedef struct SSyncFSM { void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); + + int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader); + int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); @@ -193,8 +195,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); -int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); -int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); + int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3e3850de1a..3a023bcece 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -73,7 +73,17 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { + // TODO: + + // atomic operation + // step1. sdbGetCommitInfo + // step2. create ppReader with pReaderParam + + return 0; +} + +int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); return 0; @@ -159,6 +169,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpReConfigCb = mndReConfig; pFsm->FpGetSnapshot = mndSyncGetSnapshot; + pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo; pFsm->FpSnapshotStartRead = mndSnapshotStartRead; pFsm->FpSnapshotStopRead = mndSnapshotStopRead; pFsm->FpSnapshotDoRead = mndSnapshotDoRead; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index ae61aa454e..297a486ee9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -353,8 +353,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c SyncIndex beginIndex = SYNC_INDEX_INVALID; char logBuf[256] = {0}; - if (pFsm->FpGetSnapshot != NULL) { - (*pFsm->FpGetSnapshot)(pFsm, &snapshot); + if (pFsm->FpGetSnapshotInfo != NULL) { + (*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -416,7 +416,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; - pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; + pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = NULL; pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b910b59f83..dcfea84f36 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -238,6 +238,9 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); +int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); + void syncStartNormal(int64_t rid); void syncStartStandBy(int64_t rid); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 58e40668a2..d2726201cc 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -497,7 +497,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs do { SyncIndex myLastIndex = syncNodeGetLastIndex(ths); SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && syncNodeHasSnapshot(ths); @@ -710,7 +710,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { // advance commit index to sanpshot first SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { SyncIndex commitBegin = ths->commitIndex; SyncIndex commitEnd = snapshot.lastApplyIndex; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5e5e23b312..47fd23baae 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -123,7 +123,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); if (gRaftDetailLog) { SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", snapshot.lastApplyIndex, snapshot.lastApplyTerm); } @@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pSender != NULL); bool hasSnapshot = syncNodeHasSnapshot(ths); SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); // start sending snapshot first time // start here, stop by receiver @@ -209,7 +209,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); if (gRaftDetailLog) { SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", snapshot.lastApplyIndex, snapshot.lastApplyTerm); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 738cbd3c2b..ec3e1ab2ba 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -50,7 +50,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // advance commit index to sanpshot first SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) { SyncIndex commitBegin = pSyncNode->commitIndex; SyncIndex commitEnd = snapshot.lastApplyIndex; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7fb3ba4ac0..e13271341d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -809,9 +809,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->restoreFinish = false; // pSyncNode->pSnapshot = NULL; - // if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + // if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { // pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); - // pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + // pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot); // } // tsem_init(&(pSyncNode->restoreSem), 0, 0); @@ -1658,8 +1658,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool ret = false; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { ret = true; } @@ -1669,19 +1669,19 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(syncNodeHasSnapshot(pSyncNode)); - ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); + ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL); ASSERT(index >= SYNC_INDEX_BEGIN); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); bool b = (index <= snapshot.lastApplyIndex); return b; } SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -1694,8 +1694,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) { if (syncNodeHasSnapshot(pSyncNode)) { // has snapshot SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -1747,8 +1747,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { if (syncNodeHasSnapshot(pSyncNode)) { // has snapshot SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } if (index > snapshot.lastApplyIndex + 1) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 08564f8293..f044ae5733 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -124,7 +124,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); if (gRaftDetailLog) { SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", snapshot.lastApplyIndex, snapshot.lastApplyTerm); } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d1bfc39d1e..029891a692 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->privateTerm = taosGetTimestampMs() + 100; - pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { sError("snapshotSenderCreate cannot create sender"); @@ -84,7 +84,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->blockLen = 0; // get current snapshot info - pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld", pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); @@ -558,7 +558,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); do { char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish"); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 3ee2b458d6..6fd6944273 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -36,9 +36,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -159,7 +159,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpSnapshotStartRead = SnapshotStartRead; pFsm->FpSnapshotStopRead = SnapshotStopRead; diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index b1dc53d804..a185870a3b 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -35,9 +35,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -90,7 +90,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpReConfigCb = ReConfigCb; diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index 64e1da51a1..32ff441a6f 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -54,7 +54,7 @@ void init() { pSyncNode->pWal = pWal; pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); - pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; + pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; } void cleanup() { diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index b47f8c96c5..c81a6d3ca9 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -54,7 +54,7 @@ void init() { pSyncNode->pWal = pWal; pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); - pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; + pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; } void cleanup() { @@ -80,7 +80,7 @@ void test1() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -146,7 +146,7 @@ void test2() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -203,7 +203,7 @@ void test3() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -268,7 +268,7 @@ void test4() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -335,7 +335,7 @@ void test5() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index d955e64f81..66f347b620 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -32,9 +32,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -75,7 +75,7 @@ SSyncFSM* createFsm() { pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; return pFsm; } diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index 404ba2acae..04a02460af 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -40,7 +40,7 @@ SSyncSnapshotSender* createSender() { pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead; pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead; pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead; - pSyncNode->pFsm->FpGetSnapshot = GetSnapshot; + pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot; SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2); pSender->start = true; diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 820500e2d8..954bdd8ea5 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -35,9 +35,9 @@ const char *pWalDir = "./syncSnapshotTest_wal"; void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -79,7 +79,7 @@ void initFsm() { pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; } SSyncNode *syncNodeInit() { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 1eac372bd4..74dfaa192a 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -172,7 +172,7 @@ SSyncFSM* createFsm() { pFsm->FpRollBackCb = RollBackCb; pFsm->FpReConfigCb = ReConfigCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpSnapshotStartRead = SnapshotStartRead; From a79f4a6ec77348a4711493e43efeabb010fcc80f Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 20 Jun 2022 11:25:54 +0800 Subject: [PATCH 4/6] fix(query): fix stddev function floating point NaN value TD-16656 --- source/libs/function/src/builtinsimpl.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 55900c7c7e..ed6bb7005c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1745,10 +1745,10 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (IS_INTEGER_TYPE(type)) { avg = pStddevRes->isum / ((double)pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); + pStddevRes->result = sqrt(fabs(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg)); } else { avg = pStddevRes->dsum / ((double)pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg); + pStddevRes->result = sqrt(fabs(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg)); } return functionFinalize(pCtx, pBlock); @@ -5368,4 +5368,4 @@ int32_t interpFunction(SqlFunctionCtx *pCtx) { #endif return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} From 5301dece4e0262bb84f51212456058c11f0a3ddb Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 20 Jun 2022 11:25:28 +0800 Subject: [PATCH 5/6] feat(stream): add ci --- source/libs/executor/src/timewindowoperator.c | 5 +- source/libs/function/src/builtinsimpl.c | 2 + tests/script/jenkins/basic.txt | 1 + .../script/tsim/stream/distributesession0.sim | 58 +++++++++++++++++++ 4 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 tests/script/tsim/stream/distributesession0.sim diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f8650d95c1..c556e94c74 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2892,7 +2892,10 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId); int32_t chWinSize = taosArrayGetSize(pChWins); int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey); - for (int32_t k = index; k > 0 && k < chWinSize; k++) { + if (index < 0) { + index = 0; + } + for (int32_t k = index; k < chWinSize; k++) { SResultWindowInfo* pcw = taosArrayGet(pChWins, k); if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { SResultRow* pChResult = NULL; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 583e8bd300..15e1d84673 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1481,11 +1481,13 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) { *(double*)&pDBuf->v = *(double*)&pSBuf->v; replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); + pDBuf->assign = true; } } else { if (pSBuf->assign && (((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign)) { pDBuf->v = pSBuf->v; replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); + pDBuf->assign = true; } } pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a8ec0f5151..720625a570 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -73,6 +73,7 @@ ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim # ./test.sh -f tsim/stream/distributeInterval0.sim +# ./test.sh -f tsim/stream/distributesession0.sim # ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/state0.sim diff --git a/tests/script/tsim/stream/distributesession0.sim b/tests/script/tsim/stream/distributesession0.sim new file mode 100644 index 0000000000..78f65ed8a3 --- /dev/null +++ b/tests/script/tsim/stream/distributesession0.sim @@ -0,0 +1,58 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t1 trigger at_once into streamtST as select _wstartts, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ; + +sleep 1000 + +sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1); +sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3); +sql insert into ts1 values(1648791211005,1,1,1); +sql insert into ts2 values(1648791221006,5,5,5) (1648791221007,5,5,5); +sql insert into ts2 values(1648791221008,5,5,5) (1648791221008,5,5,5)(1648791221006,5,5,5); +sql insert into ts1 values(1648791231000,1,1,1) (1648791231002,1,1,1) (1648791231006,1,1,1); +sql insert into ts1 values(1648791211000,6,6,6) (1648791231002,2,2,2); +sql insert into ts1 values(1648791211002,7,7,7); +sql insert into ts1 values(1648791211002,7,7,7) ts2 values(1648791221008,5,5,5) ; + +$loop_count = 0 +loop1: +sql select * from streamtST; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 10 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 34 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 7 then + print ======$data03 + return -1 +endi + +system sh/stop_dnodes.sh \ No newline at end of file From 9e8058dd5dd67c28dfe9eeb71376c05ed17ca200 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 20 Jun 2022 12:00:57 +0800 Subject: [PATCH 6/6] feat: sql function 'interp' --- include/libs/nodes/plannodes.h | 6 ++ include/util/taoserror.h | 1 + source/libs/nodes/src/nodesCloneFuncs.c | 3 + source/libs/nodes/src/nodesCodeFuncs.c | 21 ++++ source/libs/parser/src/parAstCreater.c | 15 +-- source/libs/parser/src/parTranslater.c | 113 ++++++++++++++++----- source/libs/parser/src/parUtil.c | 2 + source/libs/parser/test/parSelectTest.cpp | 2 - source/libs/planner/src/planLogicCreater.c | 13 ++- source/libs/planner/src/planPhysiCreater.c | 6 ++ 10 files changed, 147 insertions(+), 35 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 38a179869f..b07e8f39d5 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; @@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; typedef struct SJoinPhysiNode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index fde016375b..9fb9bcebf7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -564,6 +564,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A) #define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B) #define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C) +#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b12f910cb1..5eaca60ea5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL CLONE_NODE_LIST_FIELD(pFuncs); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(fillMode); + CLONE_NODE_FIELD(pFillValues); + CLONE_NODE_FIELD(pTimeSeries); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57d1ee7857..bde5159087 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanInterval = "Interval"; +static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; +static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; +static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); + } return code; } @@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6b3fdad27d..ccb0d37da6 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) { SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { CHECK_PARSER_STATUS(pCxt); - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { - ((SSelectStmt*)pStmt)->pFill = pFill; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) { + SFillNode* pFillClause = (SFillNode*)pFill; + nodesDestroyNode(pFillClause->pWStartTs); + pFillClause->pWStartTs = createPrimaryKeyCol(pCxt); + ((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause; } return pStmt; } @@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); pOptions->ttl = -1; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; break; - case TABLE_OPTION_TTL:{ + case TABLE_OPTION_TTL: { int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10); - if (ttl > INT32_MAX){ + if (ttl > INT32_MAX) { ttl = INT32_MAX; } // ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 77ef9027cd..a60dba3a9a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin return code; } -static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { - SFillNode* pFill = (SFillNode*)pInterval->pFill; +static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) { + if (FILL_MODE_NONE == pFill->mode) { + return TSDB_CODE_SUCCESS; + } + if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } - int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); - int64_t intervalRange = 0; - SValueNode* pInter = (SValueNode*)pInterval->pInterval; - if (TIME_IS_VAR_DURATION(pInter->unit)) { + // interp FILL clause + if (NULL == pInterval) { + return TSDB_CODE_SUCCESS; + } + + int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); + int64_t intervalRange = 0; + if (TIME_IS_VAR_DURATION(pInterval->unit)) { int64_t f = 1; - if (pInter->unit == 'n') { + if (pInterval->unit == 'n') { f = 30L * MILLISECOND_PER_DAY; - } else if (pInter->unit == 'y') { + } else if (pInterval->unit == 'y') { f = 365L * MILLISECOND_PER_DAY; } - intervalRange = pInter->datum.i * f; + intervalRange = pInterval->datum.i * f; } else { - intervalRange = pInter->datum.i; + intervalRange = pInterval->datum.i; } if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); @@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange)); if (TSDB_CODE_SUCCESS == code) { - code = checkFill(pCxt, pInterval); + code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval); } return code; } @@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { return code; } +static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { + SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL); + if (NULL == pFill) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pFill->mode = FILL_MODE_NONE; + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + nodesDestroyNode((SNode*)pFill); + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); + pFill->pWStartTs = (SNode*)pCol; + + *pOutput = (SNode*)pFill; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == pSelect->pFill) { + code = createDefaultFillNode(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange)); + } + if (TSDB_CODE_SUCCESS == code) { + code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery); + } + + return code; +} + +static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pSelect->hasInterpFunc) { + if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE); + } + return TSDB_CODE_SUCCESS; + } + + int32_t code = translateExpr(pCxt, &pSelect->pRange); + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pEvery); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterpFill(pCxt, pSelect); + } + return code; +} + static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; return translateExprList(pCxt, pPartitionByList); @@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = checkLimit(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterp(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = rewriteUniqueStmt(pCxt, pSelect); } @@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark2 = pStmt->pOptions->watermark2; -// pReq->ttl = pStmt->pOptions->ttl; + // pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfTags = LIST_LENGTH(pStmt->pTags); - if(pStmt->pOptions->commentNull == false){ + if (pStmt->pOptions->commentNull == false) { pReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pReq->commentLen = -1; } @@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pAlterReq->alterType = pStmt->alterType; if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { -// pAlterReq->ttl = pStmt->pOptions->ttl; + // pAlterReq->ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { pAlterReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pAlterReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pAlterReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pAlterReq->commentLen = -1; } @@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_OUT_OF_MEMORY; } req.commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { req.commentLen = -1; } req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); @@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(pStmt->tableName); - req.ttl = pStmt->pOptions->ttl; + req.ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { req.comment = strdup(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment); - } else{ + } else { req.commentLen = -1; } req.ctb.suid = suid; @@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p pReq->newTTL = pStmt->pOptions->ttl; } - if (TSDB_CODE_SUCCESS == code){ - if(pStmt->pOptions->commentNull == false) { + if (TSDB_CODE_SUCCESS == code) { + if (pStmt->pOptions->commentNull == false) { pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) { code = TSDB_CODE_OUT_OF_MEMORY; } pReq->newCommentLen = strlen(pReq->newComment); - } - else{ + } else { pReq->newCommentLen = -1; } } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 95cdf58d5a..d94b430c45 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function does not supportted in group query"; case TSDB_CODE_PAR_INVALID_TABLE_OPTION: return "Invalid option %s"; + case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: + return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 693c422ae0..11e09cd83d 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) { run("SELECT INTERP(c1) FROM t1 EVERY(5s)"); - run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)"); - run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b365afff02..c7490faa0c 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); } - if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) { - // SRangeNode* pRange = (SRangeNode*)pSelect->pRange; - // pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i; - // pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i; + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { + SFillNode* pFill = (SFillNode*)pSelect->pFill; + pInterpFunc->timeRange = pFill->timeRange; + pInterpFunc->fillMode = pFill->mode; + pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs); + pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 70a8aca8bd..6af5a15147 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh if (TSDB_CODE_SUCCESS == code) { pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->interval = pFuncLogicNode->interval; + pInterpFunc->fillMode = pFuncLogicNode->fillMode; + pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues); + pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code) {