From e2b5444c130b9a0e3943e7d1ac8990f4c2f04204 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Mon, 3 Apr 2023 16:18:19 +0800 Subject: [PATCH] case : add walRetetion.py case --- tests/system-test/0-others/walRetention.py | 425 +++++++++++++++++++++ 1 file changed, 425 insertions(+) create mode 100644 tests/system-test/0-others/walRetention.py diff --git a/tests/system-test/0-others/walRetention.py b/tests/system-test/0-others/walRetention.py new file mode 100644 index 0000000000..91ab07a892 --- /dev/null +++ b/tests/system-test/0-others/walRetention.py @@ -0,0 +1,425 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# The option for wal_retetion_period and wal_retention_size is work well +# + +import taos + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * + + +import os +import threading +import json +import time +from datetime import date +from datetime import datetime +from datetime import timedelta +from os import path + +stopInsert = False + + +# +# -------------- util -------------------------- +# +def pathSize(path): + + total_size = 0 + for dirpath, dirnames, filenames in os.walk(path): + for i in filenames: + # use join to concatenate all the components of path + f = os.path.join(dirpath, i) + # use getsize to generate size in bytes and add it to the total size + total_size += os.path.getsize(f) + # print(dirpath) + + print(" %s %.02f MB" % (path, total_size/1024/1024)) + return total_size + + +# load json from file +def jsonFromFile(jsonFile): + fp = open(jsonFile) + return json.load(fp) + + +# +# ----------------- class ------------------ +# + +# wal file object +class WalFile: + def __init__(self, pathFile, fileName): + self.mtime = os.path.getmtime(pathFile) + self.startVer = int(fileName) + self.fsize = os.path.getsize(pathFile) + self.endVer = -1 + self.pathFile = pathFile + + def needDelete(self, delTsLine): + return True + +# VNode object +class VNode : + # init + def __init__(self, dnodeId, path, walPeriod, walSize): + self.path = path + self.dnodeId = dnodeId + self.vgId = 0 + self.snapVer = 0 + self.walPeriod = walPeriod + self.walSize = walSize + self.walFiles = [] + self.load(path) + + # load + def load(self, path): + # load wal + walPath = os.path.join(path, "wal") + metaFile = "" + with os.scandir(walPath) as items: + for item in items: + if item.is_file(): + fileName, fileExt = os.path.splitext(item.name) + pathFile = os.path.join(walPath, item) + if fileExt == ".log": + self.walFiles.append(WalFile(pathFile, fileName)) + elif fileExt == "": + if fileName[:8] == "meta-ver": + metaFile = pathFile + # load config + tdLog.info(f' meta-ver file={metaFile}') + if metaFile != "": + jsonVer = jsonFromFile(metaFile) + metaNode = jsonVer["meta"] + self.snapVer = int(metaNode["snapshotVer"]) + + # sort with startVer + self.walFiles = sorted(self.walFiles, key=lambda x : x.startVer, reverse=True) + # set endVer + startVer = -1 + for walFile in self.walFiles: + if startVer == -1: + startVer = walFile.startVer + continue + walFile.endVer = startVer - 1 + startVer = walFile.startVer + + # print total + tdLog.info(f" ---- dnode{self.dnodeId} snapVer={self.snapVer} {self.path} --------") + for walFile in self.walFiles: + mt = datetime.fromtimestamp(walFile.mtime) + tdLog.info(f" {walFile.pathFile} {mt} startVer={walFile.startVer} endVer={walFile.endVer}") + + # snapVer compare + def canDelete(self, walFile): + if walFile.endVer == -1: + # end file + return False + + if self.snapVer > walFile.endVer: + return True + return False + + # get log size + def getWalsSize(self): + size = 0 + for walFile in self.walFiles: + size += walFile.fsize + + return size + + # vnode + def check_retention(self): + global stopInsert + # + # check period + # + delta = self.walPeriod + if self.walPeriod == 0: + delta += 1 * 60 # delete after 1 minutes + elif self.walPeriod < 3600: + delta += 3 * 60 # 5 minutes + else: + delta += 5 * 60 # 10 minutes + + delTsLine = datetime.now() - timedelta(seconds = delta) + delTs = delTsLine.timestamp() + for walFile in self.walFiles: + mt = datetime.fromtimestamp(walFile.mtime) + info = f" {walFile.pathFile} mt={mt} line={delTsLine} start={walFile.startVer} snap={self.snapVer} end= {walFile.endVer}" + tdLog.info(info) + if walFile.mtime < delTs and self.canDelete(walFile): + # wait a moment then check file exist + time.sleep(1) + if os.path.exists(walFile.pathFile): + #report error + tdLog.exit(f" wal file expired need delete. \n {walFile.pathFile} \n modify time={mt} \n delTsLine={delTsLine}\n start={walFile.startVer} snap={self.snapVer} end= {walFile.endVer}") + return False + + # + # check size + # + if self.walSize == 0: + return True + + vnodeSize = self.getWalsSize() + if vnodeSize < self.walSize: + tdLog.info(f" wal size valid. {self.path} real = {vnodeSize} set = {self.walSize} ") + return True + + # check valid + tdLog.info(f" wal size over set. {self.path} real = {vnodeSize} set = {self.walSize} ") + for walFile in self.walFiles: + if self.canDelete(walFile): + # wait a moment then check file exist + time.sleep(1) + if os.path.exists(walFile.pathFile): + tdLog.exit(f" wal file size over .\ + \n wal file = {walFile.pathFile}\ + \n snapVer = {self.snapVer}\ + \n real = {vnodeSize} bytes\ + \n set = {self.walSize} bytes") + return False + return True + + +# insert by async +def thread_insert(testCase, tbname, rows): + print(f"start thread... {tbname} - {rows} \n") + new_conn = testCase.new_connect() + testCase.insert_data(tbname, rows, new_conn) + new_conn.close() + print("end thread\n") + +# case +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.ts = 1670000000000 + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + self.conn = conn + + # init cluster path + projDir = __file__ + pos = projDir.find("tests") + if pos != -1: + self.projDir = projDir[:pos] + else: + self.projDir = __file__ + self.projDir += "sim/" + + # udf path + self.udf_path = os.path.dirname(os.path.realpath(__file__)) + "/udfpy" + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': 'varchar(120)', + 'col13': 'nchar(100)', + } + self.tag_dict = { + 't1': 'tinyint', + 't2': 'smallint', + 't3': 'int', + 't4': 'bigint', + 't5': 'tinyint unsigned', + 't6': 'smallint unsigned', + 't7': 'int unsigned', + 't8': 'bigint unsigned', + 't9': 'float', + 't10': 'double', + 't11': 'bool', + 't12': 'varchar(120)', + 't13': 'nchar(100)', + } + + # malloc new connect + def new_connect(self): + return taos.connect(host = self.conn._host, + user = self.conn._user, + password = self.conn._password, + database = self.dbname, + port = self.conn._port, + config = self.conn._config) + + def set_stb_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}, " + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}, " + create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})' + return create_stb_sql + + def create_database(self, dbname, wal_period, wal_size_kb, vgroups): + self.wal_period = wal_period + self.wal_size = wal_size_kb * 1024 + self.vgroups = vgroups + self.dbname = dbname + tdSql.execute(f"create database {dbname} wal_retention_period {wal_period} wal_retention_size {wal_size_kb} vgroups {vgroups} replica 3") + tdSql.execute(f'use {dbname}') + + # create stable and child tables + def create_table(self, stbname, tbname, count): + self.child_count = count + self.stbname = stbname + self.tbname = tbname + + # create stable + create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict) + tdSql.execute(create_table_sql) + + batch_size = 1000 + # create child table + for i in range(count): + ti = i % 128 + tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"' + sql = f'create table {tbname}{i} using {stbname} tags({tags});' + tdSql.execute(sql) + if i % batch_size == 0: + tdLog.info(f" create child table {i} ...") + + tdLog.info(f" create {count} child tables ok.") + + + # insert to child table d1 data + def insert_data(self, tbname, insertTime): + global stopInsert + start = time.time() + values = "" + child_name = "" + cnt = 0 + rows = 10000000000 + for j in range(rows): + for i in range(self.child_count): + tj = j % 128 + cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"' + sql = f'insert into {tbname}{i} values ({self.ts},{cols});' + tdSql.execute(sql) + self.ts += 1 + #tdLog.info(f" child table={i} rows={j} insert data.") + cost = time.time() - start + if j % 100 == 0: + tdSql.execute(f"flush database {self.dbname}") + print(f' cost={cost} j ={j} \n') + + if cost > insertTime and j > 1000: + tdLog.info(f" insert finished. cost time ={cost}s rows={j}") + return + + # create tmq + def create_tmq(self): + sql = f"create topic topic1_{self.dbname} as select ts, col1, concat(col12,t12) from {self.stbname};" + tdSql.execute(sql) + sql = f"create topic topic2_{self.dbname} as select * from {self.stbname};" + tdSql.execute(sql) + #tdLog.info(sql) + + def check_retention(self): + # flash database + tdSql.execute(f"flush database {self.dbname}") + time.sleep(0.5) + + vnodes = [] + # put all vnode to list + for dnode in os.listdir(self.projDir): + vnodeDir = self.projDir + f"{dnode}/data/vnode/" + print(f"vnodeDir={vnodeDir}") + # enum all vnode + for entry in os.listdir(vnodeDir): + entryPath = path.join(vnodeDir, entry) + if os.path.isdir(entryPath): + if path.exists(path.join(entryPath, "vnode.json")): + try: + vnode = VNode(i, entryPath, self.wal_period, self.wal_size) + vnodes.append(vnode) + except: + continue + + # do check + for vnode in vnodes: + vnode.check_retention() + + + # test db1 + def test_db(self, dbname, checkTime ,wal_period, wal_size_kb): + global stopInsert + # var + stable = "meters" + tbname = "d" + vgroups = 4 + count = 10 + rows = 1000000 + + # do + self.create_database(dbname, wal_period, wal_size_kb, vgroups) + self.create_table(stable, tbname, count) + + # create tmq + self.create_tmq() + + # insert data + + self.insert_data(tbname, checkTime) + + #stopInsert = False + #tobj = threading.Thread(target = thread_insert, args=(self, tbname, rows)) + #tobj.start() + + # check retention + tdLog.info(f" -------------- do check retention ---------------") + self.check_retention() + + + # stop insert and wait exit + tdLog.info(f" {dbname} stop insert ...") + stopInsert = True + tobj.join() + tdLog.info(f" {dbname} test_db end.") + + # run + def run(self): + # period + #self.test_db("db1", 10, 60, 0) + # size + #self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size + # period + size + self.test_db("db3", checkTime = 2*60, wal_period = 30, wal_size_kb=10) + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())