From ac177a8a76e6614059d7a01f767141742e202417 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Sat, 7 May 2022 19:52:36 +0800 Subject: [PATCH 01/14] test: add test case for taosd_Monitor --- tests/system-test/0-others/taosdMonitor.py | 313 +++++++++++++++++++++ tests/system-test/fulltest.sh | 5 +- 2 files changed, 315 insertions(+), 3 deletions(-) create mode 100644 tests/system-test/0-others/taosdMonitor.py diff --git a/tests/system-test/0-others/taosdMonitor.py b/tests/system-test/0-others/taosdMonitor.py new file mode 100644 index 0000000000..70e7407171 --- /dev/null +++ b/tests/system-test/0-others/taosdMonitor.py @@ -0,0 +1,313 @@ +import taos +import sys +import time +import socket +import pexpect +import os +import http.server +import gzip +import threading +import json + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +telemetryPort = '6043' + + +def telemetryInfoCheck(infoDict=''): + + hostname = socket.gethostname() + serverPort = 7080 + + if "ts" not in infoDict or len(infoDict["ts"]) == 0: + tdLog.exit("ts is null!") + + if "dnode_id" not in infoDict or infoDict["dnode_id"] != 1: + tdLog.exit("dnode_id is null!") + + if "dnode_ep" not in infoDict: + tdLog.exit("dnode_ep is null!") + + if "cluster_id" not in infoDict: + tdLog.exit("cluster_id is null!") + + if "protocol" not in infoDict or infoDict["protocol"] != 1: + tdLog.exit("protocol is null!") + + if "cluster_info" not in infoDict : + tdLog.exit("cluster_info is null!") + + # cluster_info ==================================== + + if "first_ep" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep"] == None: + tdLog.exit("first_ep is null!") + + if "first_ep_dnode_id" not in infoDict["cluster_info"] or infoDict["cluster_info"]["first_ep_dnode_id"] != 1: + tdLog.exit("first_ep_dnode_id is null!") + + if "version" not in infoDict["cluster_info"] or infoDict["cluster_info"]["version"] == None: + tdLog.exit("first_ep_dnode_id is null!") + + if "master_uptime" not in infoDict["cluster_info"] or infoDict["cluster_info"]["master_uptime"] == None: + tdLog.exit("master_uptime is null!") + + if "monitor_interval" not in infoDict["cluster_info"] or infoDict["cluster_info"]["monitor_interval"] != 5: + tdLog.exit("monitor_interval is null!") + + if "vgroups_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_total"] != 2: + tdLog.exit("vgroups_total is null!") + + if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] != 2: + tdLog.exit("vgroups_alive is null!") + + if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] != 1: + tdLog.exit("connections_total is null!") + + if "dnodes" not in infoDict["cluster_info"] or infoDict["cluster_info"]["dnodes"] == None : + tdLog.exit("dnodes is null!") + + dnodes_info = { "dnode_id": 1,"dnode_ep": f"{hostname}:{serverPort}","status":"ready"} + + for k ,v in dnodes_info.items(): + if k not in infoDict["cluster_info"]["dnodes"][0] or v != infoDict["cluster_info"]["dnodes"][0][k] : + tdLog.exit("dnodes info is null!") + + mnodes_info = { "mnode_id":1, "mnode_ep":f"{hostname}:{serverPort}","role": "LEADER" } + + for k ,v in mnodes_info.items(): + if k not in infoDict["cluster_info"]["mnodes"][0] or v != infoDict["cluster_info"]["mnodes"][0][k] : + tdLog.exit("mnodes info is null!") + + # vgroup_infos ==================================== + + if "vgroup_infos" not in infoDict or infoDict["vgroup_infos"]== None: + tdLog.exit("vgroup_infos is null!") + + vgroup_infos_lists = [{ "vgroup_id":2, "database_name":"db", "tables_num":0,"status": "ready", "vnodes": [{"dnode_id":1,"vnode_role": "LEADER" }]}, + { "vgroup_id":3, "database_name":"db", "tables_num": 0,"status":"ready", "vnodes": [{"dnode_id":1,"vnode_role": "LEADER" }]} + ] + + for index , vgroup_infos in enumerate(vgroup_infos_lists): + if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0: + tdLog.exit("vgroup_id is null!") + if "database_name" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["database_name"]!="db": + tdLog.exit("database_name is null!") + if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0: + tdLog.exit("tables_num is null!") + if "status" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["status"]!= "ready": + tdLog.exit("status is null!") + if "vnodes" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vnodes"] ==None : + tdLog.exit("vnodes is null!") + if "dnode_id" not in infoDict["vgroup_infos"][index]["vnodes"][0] or infoDict["vgroup_infos"][index]["vnodes"][0]["dnode_id"] !=1 : + tdLog.exit("vnodes is null!") + + # grant_info ==================================== + + if "grant_info" not in infoDict or infoDict["grant_info"]== None: + tdLog.exit("grant_info is null!") + + if "expire_time" not in infoDict["grant_info"] or not infoDict["grant_info"]["expire_time"] > 0: + tdLog.exit("expire_time is null!") + + if "timeseries_used" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_used"] > 0: + tdLog.exit("timeseries_used is null!") + + if "timeseries_total" not in infoDict["grant_info"] or not infoDict["grant_info"]["timeseries_total"] > 0: + tdLog.exit("timeseries_total is null!") + + # dnode_info ==================================== + + if "dnode_info" not in infoDict or infoDict["dnode_info"]== None: + tdLog.exit("dnode_info is null!") + + dnode_infos = ['uptime', 'cpu_engine', 'cpu_system', 'cpu_cores', 'mem_engine', 'mem_system', 'mem_total', 'disk_engine', + 'disk_used', 'disk_total', 'net_in', 'net_out', 'io_read', 'io_write', 'io_read_disk', 'io_write_disk', 'req_select', + 'req_select_rate', 'req_insert', 'req_insert_success', 'req_insert_rate', 'req_insert_batch', 'req_insert_batch_success', + 'req_insert_batch_rate', 'errors', 'vnodes_num', 'masters', 'has_mnode', 'has_qnode', 'has_snode', 'has_bnode'] + for elem in dnode_infos: + if elem not in infoDict["dnode_info"] or infoDict["dnode_info"][elem] < 0: + tdLog.exit(f"{elem} is null!") + + # dnode_info ==================================== + + if "disk_infos" not in infoDict or infoDict["disk_infos"]== None: + tdLog.exit("disk_infos is null!") + + # # bug for data_dir + # if "datadir" not in infoDict["disk_infos"] or len(infoDict["disk_infos"]["datadir"]) <=0 : + # tdLog.exit("datadir is null!") + + # if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0: + # tdLog.exit("name is null!") + + # if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] <= 0: + # tdLog.exit("level is null!") + + # if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0: + # tdLog.exit("avail is null!") + + # if "used" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["used"] <= 0: + # tdLog.exit("used is null!") + + # if "total" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["total"] <= 0: + # tdLog.exit("total is null!") + + + if "logdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["logdir"]== None: + tdLog.exit("logdir is null!") + + if "name" not in infoDict["disk_infos"]["logdir"] or len(infoDict["disk_infos"]["logdir"]["name"]) <= 0: + tdLog.exit("name is null!") + + if "avail" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["avail"] <= 0: + tdLog.exit("avail is null!") + + if "used" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["used"] <= 0: + tdLog.exit("used is null!") + + if "total" not in infoDict["disk_infos"]["logdir"] or infoDict["disk_infos"]["logdir"]["total"] <= 0: + tdLog.exit("total is null!") + + + + if "tempdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["tempdir"]== None: + tdLog.exit("tempdir is null!") + + if "name" not in infoDict["disk_infos"]["tempdir"] or len(infoDict["disk_infos"]["tempdir"]["name"]) <= 0: + tdLog.exit("name is null!") + + if "avail" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["avail"] <= 0: + tdLog.exit("avail is null!") + + if "used" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["used"] <= 0: + tdLog.exit("used is null!") + + if "total" not in infoDict["disk_infos"]["tempdir"] or infoDict["disk_infos"]["tempdir"]["total"] <= 0: + tdLog.exit("total is null!") + + + # log_infos ==================================== + + if "log_infos" not in infoDict or infoDict["log_infos"]== None: + tdLog.exit("log_infos is null!") + + if "logs" not in infoDict["log_infos"] or len(infoDict["log_infos"]["logs"])!= 10: + tdLog.exit("logs is null!") + + if "ts" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 10: + tdLog.exit("ts is null!") + + if "level" not in infoDict["log_infos"]["logs"][0] or infoDict["log_infos"]["logs"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]: + tdLog.exit("level is null!") + + if "content" not in infoDict["log_infos"]["logs"][0] or len(infoDict["log_infos"]["logs"][0]["ts"]) <= 1: + tdLog.exit("content is null!") + + if "summary" not in infoDict["log_infos"] or len(infoDict["log_infos"]["summary"])!= 4: + tdLog.exit("summary is null!") + + + if "total" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["total"] < 0 : + tdLog.exit("total is null!") + + if "level" not in infoDict["log_infos"]["summary"][0] or infoDict["log_infos"]["summary"][0]["level"] not in ["error" ,"info" , "debug" ,"trace"]: + tdLog.exit("level is null!") + +class RequestHandlerImpl(http.server.BaseHTTPRequestHandler): + def do_GET(self): + """ + process GET request + """ + + def do_POST(self): + """ + process POST request + """ + contentEncoding = self.headers["Content-Encoding"] + + if contentEncoding == 'gzip': + req_body = self.rfile.read(int(self.headers["Content-Length"])) + plainText = gzip.decompress(req_body).decode() + else: + plainText = self.rfile.read(int(self.headers["Content-Length"])).decode() + + print(plainText) + # 1. send response code and header + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + + # 2. send response content + #self.wfile.write(("Hello World: " + req_body + "\n").encode("utf-8")) + + # 3. check request body info + infoDict = json.loads(plainText) + #print("================") + # print(infoDict) + telemetryInfoCheck(infoDict) + + # 4. shutdown the server and exit case + assassin = threading.Thread(target=httpServer.shutdown) + assassin.daemon = True + assassin.start() + print ("==== shutdown http server ====") + +class TDTestCase: + hostname = socket.gethostname() + serverPort = '7080' + rpcDebugFlagVal = '143' + clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + clientCfgDict["serverPort"] = serverPort + clientCfgDict["firstEp"] = hostname + ':' + serverPort + clientCfgDict["secondEp"] = hostname + ':' + serverPort + clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + clientCfgDict["fqdn"] = hostname + + updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + updatecfgDict["clientCfg"] = clientCfgDict + updatecfgDict["serverPort"] = serverPort + updatecfgDict["firstEp"] = hostname + ':' + serverPort + updatecfgDict["secondEp"] = hostname + ':' + serverPort + updatecfgDict["fqdn"] = hostname + + updatecfgDict["monitorFqdn"] = hostname + updatecfgDict["monitorPort"] = '6043' + updatecfgDict["monitor"] = '1' + updatecfgDict["monitorInterval"] = "5" + updatecfgDict["monitorMaxLogs"] = "10" + updatecfgDict["monitorComp"] = "1" + + print ("===================: ", updatecfgDict) + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring + tdSql.prepare() + # time.sleep(2) + vgroups = "30" + sql = "create database db3 vgroups " + vgroups + tdSql.query(sql) + + # loop to wait request + httpServer.serve_forever() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +# create http server: bing ip/port , and request processor +serverAddress = ("", int(telemetryPort)) +httpServer = http.server.HTTPServer(serverAddress, RequestHandlerImpl) + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) + + + + + diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index cb9d472116..4125c3866b 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -6,7 +6,7 @@ python3 ./test.py -f 0-others/taosShell.py python3 ./test.py -f 0-others/taosShellError.py python3 ./test.py -f 0-others/taosShellNetChk.py python3 ./test.py -f 0-others/telemetry.py - +python3 ./test.py -f 0-others/taosdMonitor.py #python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/distinct.py @@ -23,8 +23,7 @@ python3 ./test.py -f 2-query/last.py python3 ./test.py -f 2-query/To_unixtimestamp.py python3 ./test.py -f 2-query/timetruncate.py -python3 ./test.py -f 2-query/Timediff.py -# python3 ./test.py -f 2-query/diff.py +# python3 ./test.py -f 2-query/Timediff.py #python3 ./test.py -f 2-query/cast.py From 4cd726362411383f40aef0ca6c8a64aea9a1e44c Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Sat, 7 May 2022 20:14:27 +0800 Subject: [PATCH 02/14] test: add testcase of horizontal expansion --- .../1-insert/insertWithMoreVgroup.py | 68 +++- tests/system-test/1-insert/manyVgroups.json | 76 +++++ .../1-insert/mutipythonnodebugtaosd.py | 299 ++++++++++++++++++ 3 files changed, 438 insertions(+), 5 deletions(-) create mode 100644 tests/system-test/1-insert/manyVgroups.json create mode 100644 tests/system-test/1-insert/mutipythonnodebugtaosd.py diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index a7d17bc41e..b583ee93e8 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -12,6 +12,7 @@ # -*- coding: utf-8 -*- import sys +import os import threading import multiprocessing as mp from numpy.lib.function_base import insert @@ -66,14 +67,19 @@ class TDTestCase: # run case def run(self): - # test base case - self.test_case1() - tdLog.debug(" LIMIT test_case1 ............ [OK]") + # # test base case + # self.test_case1() + # tdLog.debug(" LIMIT test_case1 ............ [OK]") - # test advance case + # test case # self.test_case2() # tdLog.debug(" LIMIT test_case2 ............ [OK]") + # test case + self.test_case3() + tdLog.debug(" LIMIT test_case3 ............ [OK]") + + # stop def stop(self): tdSql.close() @@ -115,11 +121,12 @@ class TDTestCase: return cur def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop): - host = "chenhaoran02" + host = "localhost" buildPath = self.getBuildPath() config = buildPath+ "../sim/dnode1/cfg/" tsql=self.newcur(host,config) + tsql.execute("drop database if exists %s"%dbname) tsql.execute("create database %s vgroups %d"%(dbname,vgroups)) tsql.execute("use %s" %dbname) tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) @@ -182,7 +189,52 @@ class TDTestCase: tdLog.debug("INSERT TABLE DATA ............ [OK]") return + def taosBench(self,jsonFile): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + taosBenchbin = buildPath+ "/build/bin/taosBenchmark" + os.system("%s -f %s -y " %(taosBenchbin,jsonFile)) + + return + def taosBenchCreate(self,dbname,stbname,vgroups,threadNumbers,count): + # count=50000 + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + taosBenchbin = buildPath+ "/build/bin/taosBenchmark" + # insert: create one or mutiple tables per sql and insert multiple rows per sql + tdSql.execute("drop database if exists %s"%dbname) + + tdSql.execute("create database %s vgroups %d"%(dbname,vgroups)) + tdSql.execute("use %s" %dbname) + + threads = [] + # threadNumbers=2 + for i in range(threadNumbers): + jsonfile="1-insert/Vgroups%d%d.json"%(vgroups,i) + os.system("cp -f 1-insert/manyVgroups.json %s"%(jsonfile)) + os.system("sed -i 's/\"name\": \"db\",/\"name\": \"%s%d\",/g' %s"%(dbname,i,jsonfile)) + os.system("sed -i 's/\"childtable_count\": 300000,/\"childtable_count\": %d,/g' %s "%(count,jsonfile)) + os.system("sed -i 's/\"name\": \"stb1\",/\"name\": \"%s%d\",/g' %s "%(stbname,i,jsonfile)) + os.system("sed -i 's/\"childtable_prefix\": \"stb1_\",/\"childtable_prefix\": \"%s%d_\",/g' %s "%(stbname,i,jsonfile)) + threads.append(mp.Process(target=self.taosBench, args=("%s"%jsonfile,))) + start_time = time.time() + for tr in threads: + tr.start() + for tr in threads: + tr.join() + end_time = time.time() + + spendTime=end_time-start_time + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + return # test case1 base def test_case1(self): tdLog.debug("-----create database and tables test------- ") @@ -284,6 +336,12 @@ class TDTestCase: return + def test_case3(self): + + self.taosBenchCreate("db1", "stb1", 1, 2, 1*50000) + + return + # # add case with filename # diff --git a/tests/system-test/1-insert/manyVgroups.json b/tests/system-test/1-insert/manyVgroups.json new file mode 100644 index 0000000000..df6f1163e8 --- /dev/null +++ b/tests/system-test/1-insert/manyVgroups.json @@ -0,0 +1,76 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos/", + "host": "test216", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 8, + "thread_count_create_tbl": 8, + "result_file": "./insert_res.txt", + "confirm_parameter_prompt": "no", + "insert_interval": 0, + "interlace_rows": 100000, + "num_of_records_per_req": 100000, + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "yes", + "vgroups": 1 + }, + "super_tables": [ + { + "name": "stb1", + "child_table_exists": "no", + "childtable_count": 300000, + "childtable_prefix": "stb1_", + "auto_create_table": "no", + "batch_create_tbl_num": 50000, + "data_source": "rand", + "insert_mode": "taosc", + "insert_rows": 0, + "interlace_rows": 0, + "insert_interval": 0, + "max_sql_len": 10000000, + "disorder_ratio": 0, + "disorder_range": 1000, + "timestamp_step": 10, + "sample_format": "csv", + "use_sample_ts": "no", + "tags_file": "", + "columns": [ + { + "type": "INT" + }, + { + "type": "DOUBLE", + "count": 100 + }, + { + "type": "BINARY", + "len": 400, + "count": 10 + }, + { + "type": "nchar", + "len": 200, + "count": 20 + } + ], + "tags": [ + { + "type": "TINYINT", + "count": 2 + }, + { + "type": "BINARY", + "len": 16, + "count": 2 + } + ] + } + ] + } + ] +} \ No newline at end of file diff --git a/tests/system-test/1-insert/mutipythonnodebugtaosd.py b/tests/system-test/1-insert/mutipythonnodebugtaosd.py new file mode 100644 index 0000000000..73d70b4348 --- /dev/null +++ b/tests/system-test/1-insert/mutipythonnodebugtaosd.py @@ -0,0 +1,299 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +import sys +import os +selfPath = os.path.dirname(os.path.realpath(__file__)) +utilPath="%s/../../pytest/"%selfPath +import threading +import multiprocessing as mp +from numpy.lib.function_base import insert +import taos +sys.path.append(utilPath) +from util.log import * +from util.cases import * +from util.sql import * +import numpy as np +import datetime as dt +import time +# constant define +WAITS = 5 # wait seconds + +class TDTestCase: + # + # --------------- main frame ------------------- + # + + def caseDescription(self): + ''' + limit and offset keyword function test cases; + case1: limit offset base function test + case2: offset return valid + ''' + return + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + # init + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + # tdSql.init(conn.cursor()) + # tdSql.prepare() + # self.create_tables(); + self.ts = 1500000000000 + + + # run case + def run(self): + + # test base case + self.test_case1() + tdLog.debug(" LIMIT test_case1 ............ [OK]") + + # test advance case + # self.test_case2() + # tdLog.debug(" LIMIT test_case2 ............ [OK]") + + # stop + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + # --------------- case ------------------- + + # create tables + def create_tables(self,dbname,stbname,count): + tdSql.execute("use %s" %dbname) + tdSql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) + pre_create = "create table" + sql = pre_create + tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + # print(time.time()) + exeStartTime=time.time() + for i in range(count): + sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) + if i >0 and i%3000 == 0: + tdSql.execute(sql) + sql = pre_create + # print(time.time()) + # end sql + if sql != pre_create: + tdSql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + return + + def newcur(self,host,cfg): + user = "root" + password = "taosdata" + port =6030 + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def new_create_tables(self,dbname,vgroups,stbname,tcountStart,tcountStop): + host = "127.0.0.1" + buildPath = self.getBuildPath() + config = buildPath+ "../sim/dnode1/cfg/" + + tsql=self.newcur(host,config) + tsql.execute("drop database if exists %s" %(dbname)) + tsql.execute("create database if not exists %s vgroups %d"%(dbname,vgroups)) + tsql.execute("use %s" %dbname) + tsql.execute("create stable %s(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)"%stbname) + + pre_create = "create table" + sql = pre_create + tcountStop=int(tcountStop) + tcountStart=int(tcountStart) + count=tcountStop-tcountStart + + tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + # print(time.time()) + exeStartTime=time.time() + # print(type(tcountStop),type(tcountStart)) + for i in range(tcountStart,tcountStop): + sql += " %s_%d using %s tags(%d)"%(stbname,i,stbname,i+1) + if i >0 and i%20000 == 0: + # print(sql) + tsql.execute(sql) + sql = pre_create + # print(time.time()) + # end sql + if sql != pre_create: + # print(sql) + tsql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedCreate=count/spendTime + # tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + return + + + + # insert data + def insert_data(self, dbname, stbname, ts_start, tcountStart,tcountStop,rowCount): + tdSql.execute("use %s" %dbname) + pre_insert = "insert into " + sql = pre_insert + tcount=tcountStop-tcountStart + allRows=tcount*rowCount + tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbname, allRows)) + exeStartTime=time.time() + for i in range(tcountStart,tcountStop): + sql += " %s_%d values "%(stbname,i) + for j in range(rowCount): + sql += "(%d, %d, 'taos_%d') "%(ts_start + j*1000, j, j) + if j >0 and j%5000 == 0: + # print(sql) + tdSql.execute(sql) + sql = "insert into %s_%d values " %(stbname,i) + # end sql + if sql != pre_insert: + # print(sql) + tdSql.execute(sql) + exeEndTime=time.time() + spendTime=exeEndTime-exeStartTime + speedInsert=allRows/spendTime + # tdLog.debug("spent %.2fs to INSERT %d rows , insert rate is %.2f rows/s... [OK]"% (spendTime,allRows,speedInsert)) + + tdLog.debug("INSERT TABLE DATA ............ [OK]") + return + + + # test case1 base + def test_case1(self): + tdLog.debug("-----create database and tables test------- ") + # tdSql.execute("drop database if exists db1") + # tdSql.execute("drop database if exists db4") + # tdSql.execute("drop database if exists db6") + # tdSql.execute("drop database if exists db8") + # tdSql.execute("drop database if exists db12") + # tdSql.execute("drop database if exists db16") + + #create database and tables; + + # tdSql.execute("create database db11 vgroups 1") + # # self.create_tables("db1", "stb1", 30*10000) + # tdSql.execute("use db1") + # tdSql.execute("create stable stb1(ts timestamp, c1 int, c2 binary(10)) tags(t1 int)") + + # tdSql.execute("create database db12 vgroups 1") + # # self.create_tables("db1", "stb1", 30*10000) + # tdSql.execute("use db1") + + # t1 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(1,)) + # t2 = threading.Thread(target=self.new_create_tables("db1", "stb1", 15*10000), args=(2,)) + # t1 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", 0,count/2,)) + # t2 = mp.Process(target=self.new_create_tables, args=("db1", "stb1", count/2,count,)) + + count=500000 + vgroups=1 + threads = [] + threadNumbers=2 + for i in range(threadNumbers): + threads.append(mp.Process(target=self.new_create_tables, args=("db1%d"%i, vgroups, "stb1", 0,count,))) + start_time = time.time() + for tr in threads: + tr.start() + for tr in threads: + tr.join() + end_time = time.time() + spendTime=end_time-start_time + speedCreate=count/spendTime + tdLog.debug("spent %.2fs to create 1 stable and %d table, create speed is %.2f table/s... [OK]"% (spendTime,count,speedCreate)) + # self.new_create_tables("db1", "stb1", 15*10000) + # self.new_create_tables("db1", "stb1", 15*10000) + + # tdSql.execute("create database db4 vgroups 4") + # self.create_tables("db4", "stb4", 30*10000) + + # tdSql.execute("create database db6 vgroups 6") + # self.create_tables("db6", "stb6", 30*10000) + + # tdSql.execute("create database db8 vgroups 8") + # self.create_tables("db8", "stb8", 30*10000) + + # tdSql.execute("create database db12 vgroups 12") + # self.create_tables("db12", "stb12", 30*10000) + + # tdSql.execute("create database db16 vgroups 16") + # self.create_tables("db16", "stb16", 30*10000) + return + + # test case2 base:insert data + def test_case2(self): + + tdLog.debug("-----insert data test------- ") + # drop database + tdSql.execute("drop database if exists db1") + tdSql.execute("drop database if exists db4") + tdSql.execute("drop database if exists db6") + tdSql.execute("drop database if exists db8") + tdSql.execute("drop database if exists db12") + tdSql.execute("drop database if exists db16") + + #create database and tables; + + tdSql.execute("create database db1 vgroups 1") + self.create_tables("db1", "stb1", 1*100) + self.insert_data("db1", "stb1", self.ts, 1*50,1*10000) + + + tdSql.execute("create database db4 vgroups 4") + self.create_tables("db4", "stb4", 1*100) + self.insert_data("db4", "stb4", self.ts, 1*100,1*10000) + + tdSql.execute("create database db6 vgroups 6") + self.create_tables("db6", "stb6", 1*100) + self.insert_data("db6", "stb6", self.ts, 1*100,1*10000) + + tdSql.execute("create database db8 vgroups 8") + self.create_tables("db8", "stb8", 1*100) + self.insert_data("db8", "stb8", self.ts, 1*100,1*10000) + + tdSql.execute("create database db12 vgroups 12") + self.create_tables("db12", "stb12", 1*100) + self.insert_data("db12", "stb12", self.ts, 1*100,1*10000) + + tdSql.execute("create database db16 vgroups 16") + self.create_tables("db16", "stb16", 1*100) + self.insert_data("db16", "stb16", self.ts, 1*100,1*10000) + + return + +# +# add case with filename +# +# tdCases.addWindows(__file__, TDTestCase()) +# tdCases.addLinux(__file__, TDTestCase()) +case=TDTestCase() +case.test_case1() \ No newline at end of file From d0766f14607104b5ecc54a2e653ddf65ebda16cd Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Sat, 7 May 2022 23:57:49 +0800 Subject: [PATCH 03/14] update --- tests/system-test/0-others/taosdMonitor.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/system-test/0-others/taosdMonitor.py b/tests/system-test/0-others/taosdMonitor.py index 70e7407171..ae2e9e5c31 100644 --- a/tests/system-test/0-others/taosdMonitor.py +++ b/tests/system-test/0-others/taosdMonitor.py @@ -54,13 +54,13 @@ def telemetryInfoCheck(infoDict=''): if "master_uptime" not in infoDict["cluster_info"] or infoDict["cluster_info"]["master_uptime"] == None: tdLog.exit("master_uptime is null!") - if "monitor_interval" not in infoDict["cluster_info"] or infoDict["cluster_info"]["monitor_interval"] != 5: + if "monitor_interval" not in infoDict["cluster_info"] or infoDict["cluster_info"]["monitor_interval"] !=5: tdLog.exit("monitor_interval is null!") - if "vgroups_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_total"] != 2: + if "vgroups_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_total"] < 0: tdLog.exit("vgroups_total is null!") - if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] != 2: + if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] < 0: tdLog.exit("vgroups_alive is null!") if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] != 1: @@ -86,11 +86,9 @@ def telemetryInfoCheck(infoDict=''): if "vgroup_infos" not in infoDict or infoDict["vgroup_infos"]== None: tdLog.exit("vgroup_infos is null!") - vgroup_infos_lists = [{ "vgroup_id":2, "database_name":"db", "tables_num":0,"status": "ready", "vnodes": [{"dnode_id":1,"vnode_role": "LEADER" }]}, - { "vgroup_id":3, "database_name":"db", "tables_num": 0,"status":"ready", "vnodes": [{"dnode_id":1,"vnode_role": "LEADER" }]} - ] + vgroup_infos_nums = len(infoDict["vgroup_infos"]) - for index , vgroup_infos in enumerate(vgroup_infos_lists): + for index in range(vgroup_infos_nums): if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0: tdLog.exit("vgroup_id is null!") if "database_name" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["database_name"]!="db": From 182b13b354faf8b3d980b92247c5a62e51287621 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Sun, 8 May 2022 12:14:16 +0800 Subject: [PATCH 04/14] update --- tests/system-test/0-others/taosdMonitor.py | 34 +++++++++++----------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tests/system-test/0-others/taosdMonitor.py b/tests/system-test/0-others/taosdMonitor.py index ae2e9e5c31..38c1926290 100644 --- a/tests/system-test/0-others/taosdMonitor.py +++ b/tests/system-test/0-others/taosdMonitor.py @@ -63,7 +63,7 @@ def telemetryInfoCheck(infoDict=''): if "vgroups_alive" not in infoDict["cluster_info"] or infoDict["cluster_info"]["vgroups_alive"] < 0: tdLog.exit("vgroups_alive is null!") - if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] != 1: + if "connections_total" not in infoDict["cluster_info"] or infoDict["cluster_info"]["connections_total"] < 0 : tdLog.exit("connections_total is null!") if "dnodes" not in infoDict["cluster_info"] or infoDict["cluster_info"]["dnodes"] == None : @@ -91,15 +91,15 @@ def telemetryInfoCheck(infoDict=''): for index in range(vgroup_infos_nums): if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0: tdLog.exit("vgroup_id is null!") - if "database_name" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["database_name"]!="db": + if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"])>0: tdLog.exit("database_name is null!") if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0: tdLog.exit("tables_num is null!") - if "status" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["status"]!= "ready": + if "status" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["status"]) > 0 : tdLog.exit("status is null!") if "vnodes" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vnodes"] ==None : tdLog.exit("vnodes is null!") - if "dnode_id" not in infoDict["vgroup_infos"][index]["vnodes"][0] or infoDict["vgroup_infos"][index]["vnodes"][0]["dnode_id"] !=1 : + if "dnode_id" not in infoDict["vgroup_infos"][index]["vnodes"][0] or infoDict["vgroup_infos"][index]["vnodes"][0]["dnode_id"] < 0 : tdLog.exit("vnodes is null!") # grant_info ==================================== @@ -134,24 +134,24 @@ def telemetryInfoCheck(infoDict=''): if "disk_infos" not in infoDict or infoDict["disk_infos"]== None: tdLog.exit("disk_infos is null!") - # # bug for data_dir - # if "datadir" not in infoDict["disk_infos"] or len(infoDict["disk_infos"]["datadir"]) <=0 : - # tdLog.exit("datadir is null!") + # bug for data_dir + if "datadir" not in infoDict["disk_infos"] or len(infoDict["disk_infos"]["datadir"]) <=0 : + tdLog.exit("datadir is null!") - # if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0: - # tdLog.exit("name is null!") + if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0: + tdLog.exit("name is null!") - # if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] <= 0: - # tdLog.exit("level is null!") + if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] <= 0: + tdLog.exit("level is null!") - # if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0: - # tdLog.exit("avail is null!") + if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0: + tdLog.exit("avail is null!") - # if "used" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["used"] <= 0: - # tdLog.exit("used is null!") + if "used" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["used"] <= 0: + tdLog.exit("used is null!") - # if "total" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["total"] <= 0: - # tdLog.exit("total is null!") + if "total" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["total"] <= 0: + tdLog.exit("total is null!") if "logdir" not in infoDict["disk_infos"] or infoDict["disk_infos"]["logdir"]== None: From 85c16507be014acb35bf1a2575c5280d37c61206 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Sun, 8 May 2022 12:29:54 +0800 Subject: [PATCH 05/14] update case again --- tests/system-test/0-others/taosdMonitor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/0-others/taosdMonitor.py b/tests/system-test/0-others/taosdMonitor.py index 38c1926290..8f420c6516 100644 --- a/tests/system-test/0-others/taosdMonitor.py +++ b/tests/system-test/0-others/taosdMonitor.py @@ -89,9 +89,9 @@ def telemetryInfoCheck(infoDict=''): vgroup_infos_nums = len(infoDict["vgroup_infos"]) for index in range(vgroup_infos_nums): - if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0: + if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]< 0: tdLog.exit("vgroup_id is null!") - if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"])>0: + if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"]) < 0: tdLog.exit("database_name is null!") if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0: tdLog.exit("tables_num is null!") From 4051dadaf185289371c9b4bf7fb74e3319d5e1f0 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Sun, 8 May 2022 12:35:51 +0800 Subject: [PATCH 06/14] fix bug for disk_info data_dir --- tests/system-test/0-others/taosdMonitor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/taosdMonitor.py b/tests/system-test/0-others/taosdMonitor.py index 8f420c6516..a3d3b05204 100644 --- a/tests/system-test/0-others/taosdMonitor.py +++ b/tests/system-test/0-others/taosdMonitor.py @@ -89,13 +89,13 @@ def telemetryInfoCheck(infoDict=''): vgroup_infos_nums = len(infoDict["vgroup_infos"]) for index in range(vgroup_infos_nums): - if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]< 0: + if "vgroup_id" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vgroup_id"]<0: tdLog.exit("vgroup_id is null!") if "database_name" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["database_name"]) < 0: tdLog.exit("database_name is null!") if "tables_num" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["tables_num"]!= 0: tdLog.exit("tables_num is null!") - if "status" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["status"]) > 0 : + if "status" not in infoDict["vgroup_infos"][index] or len(infoDict["vgroup_infos"][index]["status"]) < 0 : tdLog.exit("status is null!") if "vnodes" not in infoDict["vgroup_infos"][index] or infoDict["vgroup_infos"][index]["vnodes"] ==None : tdLog.exit("vnodes is null!") @@ -141,7 +141,7 @@ def telemetryInfoCheck(infoDict=''): if "name" not in infoDict["disk_infos"]["datadir"][0] or len(infoDict["disk_infos"]["datadir"][0]["name"]) <= 0: tdLog.exit("name is null!") - if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] <= 0: + if "level" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["level"] < 0: tdLog.exit("level is null!") if "avail" not in infoDict["disk_infos"]["datadir"][0] or infoDict["disk_infos"]["datadir"][0]["avail"] <= 0: From 2bff14b528c8c51d8cdc759cf95981ee6b7b0688 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 8 May 2022 22:22:47 +0800 Subject: [PATCH 07/14] refactor: adjust log --- include/common/tmsgdef.h | 1 - include/util/taoserror.h | 1 - source/dnode/mnode/impl/src/mndDnode.c | 1 - source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeCfg.c | 2 -- source/dnode/vnode/src/vnd/vnodeQuery.c | 3 +-- source/dnode/vnode/src/vnd/vnodeSvr.c | 1 - source/dnode/vnode/src/vnd/vnodeSync.c | 6 ------ 8 files changed, 2 insertions(+), 15 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 36bef5e85a..8e918c40f9 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -204,7 +204,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) - // sync integration TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 4e843aeb59..6a57b8d53e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -411,7 +411,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909) #define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A) -// sync integration #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index a55cf41262..e522be0629 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -335,7 +335,6 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) { } bool roleChanged = false; for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { - // sync integration if (pVgroup->vnodeGid[vg].dnodeId == statusReq.dnodeId) { if (pVgroup->vnodeGid[vg].role != pVload->syncState) { roleChanged = true; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 8850b35397..ebf49c644b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -157,7 +157,7 @@ struct SVnodeCfg { bool isWeak; STsdbCfg tsdbCfg; SWalCfg walCfg; - SSyncCfg syncCfg; // sync integration + SSyncCfg syncCfg; uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 5e21abb404..32866d7469 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -97,7 +97,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; - // sync integration if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; SJson *pNodeInfoArr = tjsonCreateArray(); @@ -157,7 +156,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; - // sync integration if (tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1; if (tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 7e80eacf8f..3e869650bf 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -124,8 +124,7 @@ _exit: int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = TD_VID(pVnode); - // pLoad->syncState = TAOS_SYNC_STATE_LEADER; - pLoad->syncState = syncGetMyRole(pVnode->sync); // sync integration + pLoad->syncState = syncGetMyRole(pVnode->sync); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; pLoad->totalStorage = 300; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8f7c11436d..ee6bdc1a59 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -198,7 +198,6 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data); } -// sync integration int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (syncEnvIsStart()) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 26393394e9..1260f9a3e7 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -14,12 +14,6 @@ */ #include "vnd.h" -// #include "sync.h" -// #include "syncTools.h" -// #include "tmsgcb.h" -// #include "vnodeInt.h" - -// sync integration int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { SSyncInfo syncInfo; From 4076520ac8709fb616f266e5bef8be0969d18935 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 09:51:51 +0800 Subject: [PATCH 08/14] feat: make show transactions and kill transaction work --- include/common/systable.h | 29 +++++++++---------- include/common/tglobal.h | 15 +++++----- source/common/src/systable.c | 1 + source/common/src/tglobal.c | 10 +++++-- source/dnode/mnode/impl/src/mndTrans.c | 7 +++-- source/dnode/mnode/impl/src/mnode.c | 8 ++--- source/dnode/mnode/impl/test/trans/trans2.cpp | 2 +- 7 files changed, 41 insertions(+), 31 deletions(-) diff --git a/include/common/systable.h b/include/common/systable.h index bd8aae998f..506bdcfa8a 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -23,7 +23,6 @@ extern "C" { #define TDENGINE_SYSTABLE_H #define TSDB_INFORMATION_SCHEMA_DB "information_schema" -#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_INS_TABLE_DNODES "dnodes" #define TSDB_INS_TABLE_MNODES "mnodes" #define TSDB_INS_TABLE_MODULES "modules" @@ -44,27 +43,27 @@ extern "C" { #define TSDB_INS_TABLE_VNODES "vnodes" #define TSDB_INS_TABLE_CONFIGS "configs" -#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" -#define TSDB_PERFS_TABLE_SMAS "smas" -#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" -#define TSDB_PERFS_TABLE_CONNECTIONS "connections" -#define TSDB_PERFS_TABLE_QUERIES "queries" -#define TSDB_PERFS_TABLE_TOPICS "topics" -#define TSDB_PERFS_TABLE_CONSUMERS "consumers" -#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" -#define TSDB_PERFS_TABLE_OFFSETS "offsets" -#define TSDB_PERFS_TABLE_TRANS "trans" -#define TSDB_PERFS_TABLE_STREAMS "streams" +#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" +#define TSDB_PERFS_TABLE_SMAS "smas" +#define TSDB_PERFS_TABLE_SUBSCRIBES "subscribes" +#define TSDB_PERFS_TABLE_CONNECTIONS "connections" +#define TSDB_PERFS_TABLE_QUERIES "queries" +#define TSDB_PERFS_TABLE_TOPICS "topics" +#define TSDB_PERFS_TABLE_CONSUMERS "consumers" +#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions" +#define TSDB_PERFS_TABLE_OFFSETS "offsets" +#define TSDB_PERFS_TABLE_TRANS "trans" +#define TSDB_PERFS_TABLE_STREAMS "streams" typedef struct SSysDbTableSchema { - const char *name; + const char* name; const int32_t type; const int32_t bytes; } SSysDbTableSchema; typedef struct SSysTableMeta { - const char *name; - const SSysDbTableSchema *schema; + const char* name; + const SSysDbTableSchema* schema; const int32_t colNum; } SSysTableMeta; diff --git a/include/common/tglobal.h b/include/common/tglobal.h index f253d31963..da5158abb5 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -17,8 +17,8 @@ #define _TD_COMMON_GLOBAL_H_ #include "tarray.h" -#include "tdef.h" #include "tconfig.h" +#include "tdef.h" #ifdef __cplusplus extern "C" { @@ -121,15 +121,16 @@ extern char tsCompressor[]; extern int32_t tsDiskCfgNum; extern SDiskCfg tsDiskCfg[]; -// internal -extern int32_t tsTransPullupMs; -extern int32_t tsMaRebalanceMs; +// internal +extern int32_t tsTransPullupInterval; +extern int32_t tsMqRebalanceInterval; #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) -int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile, - char *apolloUrl, SArray *pArgs, bool tsc); -int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); +int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, + const char *envFile, char *apolloUrl, SArray *pArgs, bool tsc); +int32_t taosInitCfg(const char *cfgDir, const char **envCmd, const char *envFile, char *apolloUrl, SArray *pArgs, + bool tsc); void taosCleanupCfg(); void taosCfgDynamicOptions(const char *option, const char *value); void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 51f924280a..fba14bbaf5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -211,6 +211,7 @@ static const SSysDbTableSchema transSchema[] = { {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "type", .bytes = TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "last_error", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 87a9c521af..c878109711 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -170,8 +170,8 @@ uint32_t tsCurRange = 100; // range char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR // internal -int32_t tsTransPullupMs = 6000; -int32_t tsMaRebalanceMs = 2000; +int32_t tsTransPullupInterval = 6; +int32_t tsMqRebalanceInterval = 2; void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN); @@ -438,6 +438,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "telemetryServer", tsTelemServer, 0) != 0) return -1; if (cfgAddInt32(pCfg, "telemetryPort", tsTelemPort, 1, 65056, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; + return 0; } @@ -575,6 +578,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; + tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; + tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; + if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3d4254da03..5085de8610 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1405,15 +1405,18 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); + STR_WITH_MAXSIZE_TO_VARSTR(type, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)type, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); char lastError[TSDB_TRANS_ERROR_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes); + STR_WITH_MAXSIZE_TO_VARSTR(lastError, pTrans->lastError, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)lastError, false); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index e2814e95f0..690399f099 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -65,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } - taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer); + taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer); } static void mndCalMqRebalance(void *param, void *tmrId) { @@ -81,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } - taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer); + taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer); } static void mndPullupTelem(void *param, void *tmrId) { @@ -103,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } - if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) { + if (taosTmrReset(mndPullupTrans, tsTransPullupInterval * 1000, pMnode, pMnode->timer, &pMnode->transTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) { + if (taosTmrReset(mndCalMqRebalance, tsMqRebalanceInterval * 1000, pMnode, pMnode->timer, &pMnode->mqTimer)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index b3cbcb6898..974c86b423 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -58,7 +58,7 @@ class MndTestTrans2 : public ::testing::Test { strcpy(opt.replicas[0].fqdn, "localhost"); opt.msgCb = msgCb; - tsTransPullupMs = 1000; + tsTransPullupInterval = 1; const char *mnodepath = "/tmp/mnode_test_trans"; taosRemoveDir(mnodepath); From 92f56dd817b4d63731ce1cd3adaf69194a587d67 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 09:52:09 +0800 Subject: [PATCH 09/14] test: add transaction test case --- tests/script/jenkins/basic.txt | 2 + tests/script/tsim/trans/create_db.sim | 166 ++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 tests/script/tsim/trans/create_db.sim diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 8cce169d99..f2b9e0caab 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -62,6 +62,8 @@ # ---- tstream ./test.sh -f tsim/tstream/basic0.sim +# ---- transaction + ./test.sh -f tsim/trans/create_db.sim # ---- tmq ./test.sh -f tsim/tmq/basic1.sim diff --git a/tests/script/tsim/trans/create_db.sim b/tests/script/tsim/trans/create_db.sim new file mode 100644 index 0000000000..0db5add88a --- /dev/null +++ b/tests/script/tsim/trans/create_db.sim @@ -0,0 +1,166 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1 +system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sql connect + +print =============== show dnodes +sql show dnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +sql show mnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data02 != LEADER then + return -1 +endi + +print =============== create dnodes +sql create dnode $hostname port 7200 +sleep 2000 + +sql show dnodes; +if $rows != 2 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + +if $data10 != 2 then + return -1 +endi + +print =============== kill dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print =============== create database +sql show transactions +if $rows != 0 then + return -1 +endi + +sql_error create database d1 vgroups 2; + +print =============== show transactions +sql show transactions +if $rows != 1 then + return -1 +endi + +if $data[0][0] != 2 then + return -1 +endi + +if $data[0][2] != undoAction then + return -1 +endi + +if $data[0][3] != d1 then + return -1 +endi + +if $data[0][4] != create-db then + return -1 +endi + +if $data[0][7] != @Unable to establish connection@ then + return -1 +endi + +sql_error create database d1 vgroups 2; + +print =============== start dnode2 +system sh/exec.sh -n dnode2 -s start +sleep 3000 + +sql show transactions +if $rows != 0 then + return -1 +endi + +sql create database d1 vgroups 2; + +print =============== kill dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print =============== create database +sql show transactions +if $rows != 0 then + return -1 +endi + +sql_error create database d2 vgroups 2; + +print =============== show transactions +sql show transactions +if $rows != 1 then + return -1 +endi + +if $data[0][0] != 4 then + return -1 +endi + +if $data[0][2] != undoAction then + return -1 +endi + +if $data[0][3] != d2 then + return -1 +endi + +if $data[0][4] != create-db then + return -1 +endi + +if $data[0][7] != @Unable to establish connection@ then + return -1 +endi + +sql_error create database d2 vgroups 2; + +print =============== kill transaction +sql kill transaction 4; +sleep 2000 + +sql show transactions +if $rows != 0 then + return -1 +endi + +print =============== start dnode2 +system sh/exec.sh -n dnode2 -s start +sleep 3000 + +sql show transactions +if $rows != 0 then + return -1 +endi + +sql create database d2 vgroups 2; +sql_error kill transaction 1; +sql_error kill transaction 2; +sql_error kill transaction 3; +sql_error kill transaction 4; +sql_error kill transaction 5; + +return +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file From ede2bc03fcec9e03e710604fd561f7087995fea3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 May 2022 11:04:16 +0800 Subject: [PATCH 10/14] refactor: node mgmt --- source/dnode/mgmt/implement/src/dmEps.c | 30 +++--- source/dnode/mgmt/implement/src/dmWorker.c | 28 ++++-- source/dnode/mgmt/mgmt_bnode/src/bmWorker.c | 24 ++++- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 22 ++-- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 4 +- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 106 +++++++++++++------- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 45 +++++---- source/dnode/mgmt/mgmt_snode/src/smWorker.c | 48 +++++---- 8 files changed, 192 insertions(+), 115 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmEps.c b/source/dnode/mgmt/implement/src/dmEps.c index 853c238316..f5c9a1d91b 100644 --- a/source/dnode/mgmt/implement/src/dmEps.c +++ b/source/dnode/mgmt/implement/src/dmEps.c @@ -51,7 +51,7 @@ int32_t dmReadEps(SDnode *pDnode) { pDnode->data.dnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); if (pDnode->data.dnodeEps == NULL) { dError("failed to calloc dnodeEp array since %s", strerror(errno)); - goto PRASE_DNODE_OVER; + goto _OVER; } snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); @@ -59,53 +59,53 @@ int32_t dmReadEps(SDnode *pDnode) { if (pFile == NULL) { // dDebug("file %s not exist", file); code = 0; - goto PRASE_DNODE_OVER; + goto _OVER; } len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); - goto PRASE_DNODE_OVER; + goto _OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); - goto PRASE_DNODE_OVER; + goto _OVER; } cJSON *dnodeId = cJSON_GetObjectItem(root, "dnodeId"); if (!dnodeId || dnodeId->type != cJSON_Number) { dError("failed to read %s since dnodeId not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } pDnode->data.dnodeId = dnodeId->valueint; cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId"); if (!clusterId || clusterId->type != cJSON_String) { dError("failed to read %s since clusterId not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } pDnode->data.clusterId = atoll(clusterId->valuestring); cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { dError("failed to read %s since dropped not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } pDnode->data.dropped = dropped->valueint; cJSON *dnodes = cJSON_GetObjectItem(root, "dnodes"); if (!dnodes || dnodes->type != cJSON_Array) { dError("failed to read %s since dnodes not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } int32_t numOfDnodes = cJSON_GetArraySize(dnodes); if (numOfDnodes <= 0) { dError("failed to read %s since numOfDnodes:%d invalid", file, numOfDnodes); - goto PRASE_DNODE_OVER; + goto _OVER; } for (int32_t i = 0; i < numOfDnodes; ++i) { @@ -117,7 +117,7 @@ int32_t dmReadEps(SDnode *pDnode) { cJSON *did = cJSON_GetObjectItem(node, "id"); if (!did || did->type != cJSON_Number) { dError("failed to read %s since dnodeId not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } dnodeEp.id = did->valueint; @@ -125,14 +125,14 @@ int32_t dmReadEps(SDnode *pDnode) { cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { dError("failed to read %s since dnodeFqdn not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); if (!dnodePort || dnodePort->type != cJSON_Number) { dError("failed to read %s since dnodePort not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } dnodeEp.ep.port = dnodePort->valueint; @@ -140,7 +140,7 @@ int32_t dmReadEps(SDnode *pDnode) { cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); if (!isMnode || isMnode->type != cJSON_Number) { dError("failed to read %s since isMnode not found", file); - goto PRASE_DNODE_OVER; + goto _OVER; } dnodeEp.isMnode = isMnode->valueint; @@ -151,7 +151,7 @@ int32_t dmReadEps(SDnode *pDnode) { dDebug("succcessed to read file %s", file); dmPrintEps(pDnode); -PRASE_DNODE_OVER: +_OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); @@ -176,7 +176,7 @@ PRASE_DNODE_OVER: int32_t dmWriteEps(SDnode *pDnode) { char file[PATH_MAX] = {0}; - char realfile[PATH_MAX]; + char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%sdnode.json.bak", pDnode->wrappers[DNODE].path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%sdnode.json", pDnode->wrappers[DNODE].path, TD_DIRSEP); diff --git a/source/dnode/mgmt/implement/src/dmWorker.c b/source/dnode/mgmt/implement/src/dmWorker.c index b19c2ab36b..72b2111591 100644 --- a/source/dnode/mgmt/implement/src/dmWorker.c +++ b/source/dnode/mgmt/implement/src/dmWorker.c @@ -105,12 +105,13 @@ void dmStopMonitorThread(SDnode *pDnode) { } static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { - SDnode * pDnode = pInfo->ahandle; - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; + SDnode *pDnode = pInfo->ahandle; + + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); - switch (pRpc->msgType) { + switch (msgType) { case TDMT_DND_CONFIG_DNODE: code = dmProcessConfigReq(pDnode, pMsg); break; @@ -148,9 +149,14 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { break; } - if (pRpc->msgType & 1u) { - if (code != 0) code = terrno; - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId}; + if (msgType & 1u) { + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .refId = pMsg->rpcMsg.refId, + }; rpcSendResponse(&rsp); } @@ -160,7 +166,13 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnode *pDnode) { - SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pDnode}; + SSingleWorkerCfg cfg = { + .min = 1, + .max = 1, + .name = "dnode-mgmt", + .fp = (FItem)dmProcessMgmtQueue, + .param = pDnode, + }; if (tSingleWorkerInit(&pDnode->data.mgmtWorker, &cfg) != 0) { dError("failed to start dnode-mgmt worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index 230fa23674..d3204039e6 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -18,7 +18,11 @@ static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = { - .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code, .refId = pMsg->rpcMsg.refId}; + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .code = code, + .refId = pMsg->rpcMsg.refId, + }; tmsgSendRsp(&rpcRsp); dTrace("msg:%p, is freed", pMsg); @@ -103,7 +107,7 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SBnodeMgmt * pMgmt = pWrapper->pMgmt; + SBnodeMgmt *pMgmt = pWrapper->pMgmt; SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -112,7 +116,7 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SBnodeMgmt * pMgmt = pWrapper->pMgmt; + SBnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -121,7 +125,12 @@ int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = { + .max = 1, + .name = "bnode-write", + .fp = (FItems)bmProcessWriteQueue, + .param = pMgmt, + }; if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { dError("failed to start bnode-write worker since %s", terrstr()); return -1; @@ -129,7 +138,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "bnode-monitor", + .fp = (FItem)bmProcessMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start bnode-monitor worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 75c48e79eb..83c832a41e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -22,7 +22,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { int32_t maxLen = 4096; char *content = taosMemoryCalloc(1, maxLen + 1); cJSON *root = NULL; - char file[PATH_MAX]; + char file[PATH_MAX] = {0}; TdFilePtr pFile = NULL; snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP); @@ -30,39 +30,39 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { if (pFile == NULL) { // dDebug("file %s not exist", file); code = 0; - goto PRASE_MNODE_OVER; + goto _OVER; } len = (int32_t)taosReadFile(pFile, content, maxLen); if (len <= 0) { dError("failed to read %s since content is null", file); - goto PRASE_MNODE_OVER; + goto _OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); - goto PRASE_MNODE_OVER; + goto _OVER; } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); if (!deployed || deployed->type != cJSON_Number) { dError("failed to read %s since deployed not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } *pDeployed = deployed->valueint; cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); if (!mnodes || mnodes->type != cJSON_Array) { dError("failed to read %s since nodes not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } pMgmt->replica = cJSON_GetArraySize(mnodes); if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); - goto PRASE_MNODE_OVER; + goto _OVER; } for (int32_t i = 0; i < pMgmt->replica; ++i) { @@ -74,21 +74,21 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { cJSON *id = cJSON_GetObjectItem(node, "id"); if (!id || id->type != cJSON_Number) { dError("failed to read %s since id not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } pReplica->id = id->valueint; cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { dError("failed to read %s since fqdn not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); cJSON *port = cJSON_GetObjectItem(node, "port"); if (!port || port->type != cJSON_Number) { dError("failed to read %s since port not found", file); - goto PRASE_MNODE_OVER; + goto _OVER; } pReplica->port = port->valueint; } @@ -96,7 +96,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) { code = 0; dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); -PRASE_MNODE_OVER: +_OVER: if (content != NULL) taosMemoryFree(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index db69b62e58..0bf846b7fc 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -161,9 +161,7 @@ static int32_t mmOpen(SMgmtWrapper *pWrapper) { SMnodeOpt option = {0}; if (!deployed) { dInfo("mnode start to deploy"); - // if (pWrapper->procType == DND_PROC_CHILD) { - pWrapper->pDnode->data.dnodeId = 1; - // } + pWrapper->pDnode->data.dnodeId = 1; mmBuildOptionForDeploy(pMgmt, &option); } else { dInfo("mnode start to open"); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 1f27b314e2..aac5bbc16a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -17,42 +17,48 @@ #include "mmInt.h" static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; dTrace("msg:%p, get from mnode queue", pMsg); - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; - if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) { - code = mmProcessAlterReq(pMgmt, pMsg); - } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { - code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); - } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_LOAD) { - code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); - } else { - pMsg->pNode = pMgmt->pMnode; - code = mndProcessMsg(pMsg); + switch (msgType) { + case TDMT_DND_ALTER_MNODE: + code = mmProcessAlterReq(pMgmt, pMsg); + break; + case TDMT_MON_MM_INFO: + code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); + break; + case TDMT_MON_MM_LOAD: + code = mmProcessGetMnodeLoadsReq(pMgmt->pWrapper, pMsg); + break; + default: + pMsg->pNode = pMgmt->pMnode; + code = mndProcessMsg(pMsg); } - if (pRpc->msgType & 1U) { - if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (msgType & 1U) { + if (pMsg->rpcMsg.handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && terrno != 0) code = terrno; mmSendRsp(pMsg, code); } } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pRpc->pCont); + rpcFreeCont(pMsg->rpcMsg.pCont); taosFreeQitem(pMsg); } @@ -78,38 +84,38 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static void mmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { +static void mmPutNodeMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); taosWriteQitem(pWorker->queue, pMsg); } int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->writeWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->writeWorker, pMsg); return 0; } int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->syncWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg); return 0; } int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->readWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg); return 0; } int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->queryWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->queryWorker, pMsg); return 0; } int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - mmPutMsgToWorker(&pMgmt->monitorWorker, pMsg); + mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); return 0; } @@ -144,40 +150,62 @@ int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { - SSingleWorkerCfg qCfg = {.min = tsNumOfMnodeQueryThreads, - .max = tsNumOfMnodeQueryThreads, - .name = "mnode-query", - .fp = (FItem)mmProcessQueryQueue, - .param = pMgmt}; + SSingleWorkerCfg qCfg = { + .min = tsNumOfMnodeQueryThreads, + .max = tsNumOfMnodeQueryThreads, + .name = "mnode-query", + .fp = (FItem)mmProcessQueryQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) { dError("failed to start mnode-query worker since %s", terrstr()); return -1; } - SSingleWorkerCfg rCfg = {.min = tsNumOfMnodeReadThreads, - .max = tsNumOfMnodeReadThreads, - .name = "mnode-read", - .fp = (FItem)mmProcessQueue, - .param = pMgmt}; + SSingleWorkerCfg rCfg = { + .min = tsNumOfMnodeReadThreads, + .max = tsNumOfMnodeReadThreads, + .name = "mnode-read", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->readWorker, &rCfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } - SSingleWorkerCfg wCfg = {.min = 1, .max = 1, .name = "mnode-write", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg wCfg = { + .min = 1, + .max = 1, + .name = "mnode-write", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->writeWorker, &wCfg) != 0) { dError("failed to start mnode-write worker since %s", terrstr()); return -1; } - SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg sCfg = { + .min = 1, + .max = 1, + .name = "mnode-sync", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { dError("failed to start mnode mnode-sync worker since %s", terrstr()); return -1; } if (tsMultiProcess) { - SSingleWorkerCfg mCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "mnode-monitor", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start mnode mnode-monitor worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index da85ee64a8..965d35cb3e 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -17,12 +17,14 @@ #include "qmInt.h" static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } @@ -145,22 +147,26 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { } int32_t qmStartWorker(SQnodeMgmt *pMgmt) { - SSingleWorkerCfg queryCfg = {.min = tsNumOfVnodeQueryThreads, - .max = tsNumOfVnodeQueryThreads, - .name = "qnode-query", - .fp = (FItem)qmProcessQueryQueue, - .param = pMgmt}; + SSingleWorkerCfg queryCfg = { + .min = tsNumOfVnodeQueryThreads, + .max = tsNumOfVnodeQueryThreads, + .name = "qnode-query", + .fp = (FItem)qmProcessQueryQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) { dError("failed to start qnode-query worker since %s", terrstr()); return -1; } - SSingleWorkerCfg fetchCfg = {.min = tsNumOfQnodeFetchThreads, - .max = tsNumOfQnodeFetchThreads, - .name = "qnode-fetch", - .fp = (FItem)qmProcessFetchQueue, - .param = pMgmt}; + SSingleWorkerCfg fetchCfg = { + .min = tsNumOfQnodeFetchThreads, + .max = tsNumOfQnodeFetchThreads, + .name = "qnode-fetch", + .fp = (FItem)qmProcessFetchQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) { dError("failed to start qnode-fetch worker since %s", terrstr()); @@ -169,7 +175,12 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonitorQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "qnode-monitor", + .fp = (FItem)qmProcessMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start qnode-monitor worker since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 25872aec55..2ae439bbd6 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -17,12 +17,14 @@ #include "smInt.h" static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, - .ahandle = pMsg->rpcMsg.ahandle, - .refId = pMsg->rpcMsg.refId, - .code = code, - .pCont = pMsg->pRsp, - .contLen = pMsg->rspLen}; + SRpcMsg rsp = { + .handle = pMsg->rpcMsg.handle, + .ahandle = pMsg->rpcMsg.ahandle, + .refId = pMsg->rpcMsg.refId, + .code = code, + .pCont = pMsg->pRsp, + .contLen = pMsg->rspLen, + }; tmsgSendRsp(&rsp); } @@ -90,7 +92,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = { + .max = 1, + .name = "snode-unique", + .fp = smProcessUniqueQueue, + .param = pMgmt, + }; if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) { dError("failed to start snode-unique worker since %s", terrstr()); return -1; @@ -101,11 +108,13 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { } } - SSingleWorkerCfg cfg = {.min = tsNumOfSnodeSharedThreads, - .max = tsNumOfSnodeSharedThreads, - .name = "snode-shared", - .fp = (FItem)smProcessSharedQueue, - .param = pMgmt}; + SSingleWorkerCfg cfg = { + .min = tsNumOfSnodeSharedThreads, + .max = tsNumOfSnodeSharedThreads, + .name = "snode-shared", + .fp = (FItem)smProcessSharedQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) { dError("failed to start snode shared-worker since %s", terrstr()); @@ -114,7 +123,12 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { if (tsMultiProcess) { SSingleWorkerCfg mCfg = { - .min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt}; + .min = 1, + .max = 1, + .name = "snode-monitor", + .fp = (FItem)smProcessMonitorQueue, + .param = pMgmt, + }; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start snode-monitor worker since %s", terrstr()); return -1; @@ -150,7 +164,7 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { } int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; @@ -163,7 +177,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); @@ -172,7 +186,7 @@ int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { @@ -186,7 +200,7 @@ int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SSnodeMgmt * pMgmt = pWrapper->pMgmt; + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); From 8f3619ee893bac22a6b3c423e40310a665ec869d Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 9 May 2022 11:34:35 +0800 Subject: [PATCH 11/14] feature(udf): enhance error process --- source/libs/function/src/tudf.c | 48 ++++++++++++++++++++------------- source/libs/scalar/src/scalar.c | 17 +++++++++--- 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 84860ca5a1..75b6aeaae9 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1259,7 +1259,9 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult return false; } UdfcFuncHandle handle; - if (setupUdf((char*)pCtx->udfName, &handle) != 0) { + int32_t udfCode = 0; + if ((udfCode = setupUdf((char*)pCtx->udfName, &handle)) != 0) { + fnError("udfAggInit error. step setupUdf. udf code: %d", udfCode); return false; } SClientUdfUvSession *session = (SClientUdfUvSession *)handle; @@ -1272,7 +1274,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult udfRes->session = (SClientUdfUvSession *)handle; SUdfInterBuf buf = {0}; - if (callUdfAggInit(handle, &buf) != 0) { + if ((udfCode = callUdfAggInit(handle, &buf)) != 0) { + fnError("udfAggInit error. step callUdfAggInit. udf code: %d", udfCode); return false; } udfRes->interResNum = buf.numOfResult; @@ -1316,21 +1319,23 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; - callUdfAggProcess(session, inputBlock, &state, &newState); - - udfRes->interResNum = newState.numOfResult; - memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); - + int32_t udfCode = callUdfAggProcess(session, inputBlock, &state, &newState); + if (udfCode != 0) { + fnError("udfAggProcess error. code: %d", udfCode); + newState.numOfResult = 0; + } else { + udfRes->interResNum = newState.numOfResult; + memcpy(udfRes->interResBuf, newState.buf, newState.bufLen); + } if (newState.numOfResult == 1 || state.numOfResult == 1) { GET_RES_INFO(pCtx)->numOfRes = 1; } blockDataDestroy(inputBlock); - taosArrayDestroy(tempBlock.pDataBlock); taosMemoryFree(newState.buf); - return 0; + return TSDB_CODE_SUCCESS; } int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { @@ -1344,15 +1349,22 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; - callUdfAggFinalize(session, &state, &resultBuf); - - udfRes->finalResBuf = resultBuf.buf; - udfRes->finalResNum = resultBuf.numOfResult; - - teardownUdf(session); - - if (resultBuf.numOfResult == 1) { - GET_RES_INFO(pCtx)->numOfRes = 1; + int32_t udfCallCode= 0; + udfCallCode= callUdfAggFinalize(session, &state, &resultBuf); + if (udfCallCode!= 0) { + fnError("udfAggFinalize error. callUdfAggFinalize step. udf code:%d", udfCallCode); + GET_RES_INFO(pCtx)->numOfRes = 0; + } else { + memcpy(udfRes->finalResBuf, resultBuf.buf, session->outputLen); + udfRes->finalResNum = resultBuf.numOfResult; + GET_RES_INFO(pCtx)->numOfRes = udfRes->finalResNum; } + + int32_t code = teardownUdf(session); + if (code != 0) { + fnError("udfAggFinalize error. teardownUdf step. udf code: %d", code); + } + return functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf); + } \ No newline at end of file diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index f5fab814ff..5231890821 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -347,10 +347,21 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp if (fmIsUserDefinedFunc(node->funcId)) { UdfcFuncHandle udfHandle = NULL; - SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle)); + code = setupUdf(node->functionName, &udfHandle); + if (code != 0) { + sclError("fmExecFunction error. setupUdf. function name: %s, code:%d", node->functionName, code); + goto _return; + } code = callUdfScalarFunc(udfHandle, params, paramNum, output); - teardownUdf(udfHandle); - SCL_ERR_JRET(code); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } + code = teardownUdf(udfHandle); + if (code != 0) { + sclError("fmExecFunction error. callUdfScalarFunc. function name: %s, udf code:%d", node->functionName, code); + goto _return; + } } else { SScalarFuncExecFuncs ffpSet = {0}; code = fmGetScalarFuncExecFuncs(node->funcId, &ffpSet); From 2c4b44b5909ef77bd3d0e2b8148588e1383ee4bf Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 9 May 2022 11:46:00 +0800 Subject: [PATCH 12/14] fix: merge data of mem and file --- source/dnode/vnode/src/tsdb/tsdbRead.c | 27 +++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e0f8943a48..2d1800a92c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1985,7 +1985,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf return; } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; - TSKEY lastRowKey = TSKEY_INITIAL_VAL; + TSKEY lastKeyAppend = TSKEY_INITIAL_VAL; do { STSRow* row2 = NULL; @@ -2019,8 +2019,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey); - // numOfRows += 1; + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); + if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf cur->win.ekey = key; cur->lastKey = key + step; cur->mixBlock = true; - moveToNextRowInMem(pCheckInfo); } else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it #if 0 @@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } #endif if (TD_SUPPORT_UPDATE(pCfg->update)) { - doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); + if(lastKeyAppend != key) { + lastKeyAppend = key; + ++curRow; + } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); if (rv1 != TD_ROW_SVER(row1)) { // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); @@ -2074,10 +2077,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastRowKey); - // ++numOfRows; + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2118,11 +2119,19 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t qstart = 0, qend = 0; getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend); + if ((lastKeyAppend != TSKEY_INITIAL_VAL) && + (lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) { + ++curRow; + } + numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); pos += (qend - qstart + 1) * step; + if (numOfRows > 0) { + curRow = numOfRows - 1; + } cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->lastKey = cur->win.ekey + step; + lastKeyAppend = cur->win.ekey; } } while (numOfRows < pTsdbReadHandle->outputCapacity); From 4b6cae113f5217c9961db803f9cc5dab5ea27f13 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 9 May 2022 11:51:18 +0800 Subject: [PATCH 13/14] enh: code optimization --- source/dnode/vnode/src/tsdb/tsdbRead.c | 41 +++++++++++++------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5ac325a220..9c86de06af 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -879,14 +879,14 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } } - -static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) { +static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, + TDRowVerT maxVer) { STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { rmem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor +#if 0 // TODO: skiplist refactor if (TD_ROW_VER(rmem) > maxVer) { rmem = NULL; } @@ -898,7 +898,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { rimem = (STSRow*)SL_GET_NODE_DATA(node); -#if 0 // TODO: skiplist refactor +#if 0 // TODO: skiplist refactor if (TD_ROW_VER(rimem) > maxVer) { rimem = NULL; } @@ -1677,7 +1677,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa colIdOfRow2 = tdKvRowColIdAt(row2, k); } - if (colIdOfRow1 < colIdOfRow2) { // the most probability + if (colIdOfRow1 < colIdOfRow2) { // the most probability if (colIdOfRow1 < pColInfo->info.colId) { ++j; continue; @@ -1720,7 +1720,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ++(*curRow); } ++nResult; - } else if (update){ + } else if (update) { mergeOption = 2; } else { mergeOption = 0; @@ -1741,7 +1741,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ++(*curRow); } ++nResult; - } else if(update) { + } else if (update) { mergeOption = 2; } else { mergeOption = 0; @@ -2018,8 +2018,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf rv2 = TD_ROW_SVER(row2); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; } @@ -2062,7 +2063,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } #endif if (TD_SUPPORT_UPDATE(pCfg->update)) { - if(lastKeyAppend != key) { + if (lastKeyAppend != key) { lastKeyAppend = key; ++curRow; } @@ -2076,8 +2077,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); rv2 = TD_ROW_SVER(row2); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, - pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); + numOfRows += + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, + pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -2436,7 +2438,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb); - STimeWindow win = TSWINDOW_INITIALIZER; + STimeWindow win = TSWINDOW_INITIALIZER; while (true) { tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); @@ -2742,7 +2744,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STSchema* pSchema = NULL; TSKEY lastRowKey = TSKEY_INITIAL_VAL; - do { STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX); if (row == NULL) { @@ -2767,8 +2768,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); rv = TD_ROW_SVER(row); } - numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, - NULL, pCfg->update, &lastRowKey); + numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, + pSchema, NULL, pCfg->update, &lastRowKey); if (numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); @@ -2777,7 +2778,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } while (moveToNextRowInMem(pCheckInfo)); - taosMemoryFreeClear(pSchema); // free the STSChema + taosMemoryFreeClear(pSchema); // free the STSChema assert(numOfRows <= maxRowsToRead); @@ -2905,8 +2906,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { // if (ret != TSDB_CODE_SUCCESS) { // return false; // } - mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId, - NULL, NULL, true, &lastRowKey); + mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, + pCheckInfo->tableId, NULL, NULL, true, &lastRowKey); taosMemoryFreeClear(pRow); // update the last key value @@ -3475,7 +3476,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; -// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); + // ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } /* From 35c2f9e484dbc084f2363f082aeae701cdfee35e Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 9 May 2022 12:00:07 +0800 Subject: [PATCH 14/14] fix: code optimization --- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9c86de06af..9294718550 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2120,7 +2120,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t qstart = 0, qend = 0; getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); - lastKeyAppend = tsArray[qend]; if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {