diff --git a/tests/army/storage/s3/azure.py b/tests/army/storage/s3/azure.py new file mode 100644 index 0000000000..a8b821a50f --- /dev/null +++ b/tests/army/storage/s3/azure.py @@ -0,0 +1,86 @@ +import requests +import hmac +import hashlib +import base64 +from datetime import datetime +from urllib.parse import urlparse, parse_qs +import xml.etree.ElementTree as ET + + +# Define a function to recursively convert XML into a dictionary +def xml_to_dict(element): + if len(element) == 0: + return element.text + result = {} + for child in element: + child_data = xml_to_dict(child) + if child.tag in result: + if isinstance(result[child.tag], list): + result[child.tag].append(child_data) + else: + result[child.tag] = [result[child.tag], child_data] + else: + result[child.tag] = child_data + return result + + +# Get the current time +def get_utc_now(): + return datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT') + + +class Azure: + def __init__(self, account_name, account_key, container_name): + self.account_name = account_name + self.account_key = account_key + self.container_name = container_name + + def blob_list(self): + url = f'https://{self.account_name}.blob.core.windows.net/{self.container_name}?comp=list&restype=container&timeout=20' + return self.console_get(url) + + def generate_signature(self, url): + date = get_utc_now() + version = '2021-08-06' + string_to_sign = (f"GET\n\n\n\n\n\n\n\n\n\n\n\n" + f"x-ms-date:{date}\n" + f"x-ms-version:{version}\n" + f"/{self.account_name}/{self.container_name}") + query_params = parse_qs(urlparse(url).query) + for param in query_params: + string_to_sign += "\n%s:%s" % (param, query_params[param][0]) + decoded_key = base64.b64decode(self.account_key) + signed_string = hmac.new(decoded_key, string_to_sign.encode('utf-8'), hashlib.sha256).digest() + signature = base64.b64encode(signed_string).decode('utf-8') + headers = { + 'x-ms-date': date, + 'x-ms-version': version, + 'Authorization': f'SharedKey {self.account_name}:{signature}' + } + return headers + + def console_get(self, url): + # Generate authorization header + headers = self.generate_signature(url) + # request + response = requests.get(url, headers=headers) + xml_data = response.text + # Parse XML data + root = ET.fromstring(xml_data) + + # Convert XML to Dictionary + data_dict = xml_to_dict(root) + return data_dict + + +if __name__ == '__main__': + # Set request parameters + account_name = 'fd2d01cd892f844eeaa2273' + account_key = '1234' + container_name = 'td-test' + + Azure = Azure(account_name, account_key, container_name) + result = Azure.blob_list() + # Print JSON data + for blob in result["Blobs"]["Blob"]: + print(blob["Name"]) diff --git a/tests/army/storage/s3/s3Basic.py b/tests/army/storage/s3/s3Basic.py index c86c8b0b8e..bc55fe6f5c 100644 --- a/tests/army/storage/s3/s3Basic.py +++ b/tests/army/storage/s3/s3Basic.py @@ -129,7 +129,7 @@ class TDTestCase(TBase): time.sleep(2) sc.dnodeStart(1) loop += 1 - # miggrate + # migrate self.migrateDbS3() # check can pass diff --git a/tests/army/storage/s3/s3azure.py b/tests/army/storage/s3/s3azure.py new file mode 100644 index 0000000000..8274db9354 --- /dev/null +++ b/tests/army/storage/s3/s3azure.py @@ -0,0 +1,391 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool +import frame.eos +import frame.eutil +import requests + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame.srvCtl import * +from frame import * +from frame.eos import * +from azure import Azure + +# +# 192.168.1.52 MINIO S3 +# + +''' +s3EndPoint http://192.168.1.52:9000 +s3AccessKey 'zOgllR6bSnw2Ah3mCNel:cdO7oXAu3Cqdb1rUdevFgJMi0LtRwCXdWKQx4bhX' +s3BucketName ci-bucket +s3UploadDelaySec 60 + +for test: +"s3AccessKey" : "fGPPyYjzytw05nw44ViA:vK1VcwxgSOykicx6hk8fL1x15uEtyDSFU3w4hTaZ" +"s3BucketName": "test-bucket" +''' + + +class TDTestCase(TBase): + def __init__(self): + self.fileName = "" # track the upload of S3 file + + account_name = 'fd2d01cd892f844eeaa2273' + url = "http://192.168.0.21/azure_account_key.txt" + response = requests.get(url) + account_key = response.text.strip() + container_name = 'td-test' + + self.azure_class = Azure(account_name, account_key, container_name) + + # index = eutil.cpuRand(20) + 1 + bucketName = "td-test" + updatecfgDict = { + "supportVnodes": "1000", + 's3EndPoint': 'http://192.168.1.49', + 's3AccessKey': 'FlIOwdr5HAnsMyEZ6FBwSPE5:87x1tZJll1SaK4hoiglC8zPRhDgbMeW6ufQqEt8', + 's3BucketName': f'{bucketName}', + 's3PageCacheSize': '10240', + "s3UploadDelaySec": "10", + 's3MigrateIntervalSec': '600', + 's3MigrateEnabled': '1' + } + + tdLog.info(f"assign bucketName is {bucketName}\n") + maxFileSize = (128 + 10) * 1014 * 1024 # add 10M buffer + + def insertData(self): + tdLog.info(f"insert data.") + # taosBenchmark run + json = etool.curFile(__file__, "s3Basic.json") + etool.benchMark(json=json) + + tdSql.execute(f"use {self.db}") + # come from s3_basic.json + self.childtable_count = 6 + self.insert_rows = 2000000 + self.timestamp_step = 100 + + def createStream(self, sname): + sql = f"create stream {sname} fill_history 1 into stm1 as select count(*) from {self.db}.{self.stb} interval(10s);" + tdSql.execute(sql) + + def migrateDbS3(self): + sql = f"s3migrate database {self.db}" + tdSql.execute(sql, show=True) + + def checkDataFile(self, lines, maxFileSize): + # ls -l + # -rwxrwxrwx 1 root root 41652224 Apr 17 14:47 vnode2/tsdb/v2f1974ver47.3.data + overCnt = 0 + for line in lines: + cols = line.split() + fileSize = int(cols[4]) + fileName = cols[8] + # print(f" filesize={fileSize} fileName={fileName} line={line}") + if fileSize > maxFileSize: + tdLog.info(f"error, {fileSize} over max size({maxFileSize}) {fileName}\n") + overCnt += 1 + self.fileName = fileName + else: + tdLog.info(f"{fileName}({fileSize}) check size passed.") + + return overCnt + + def checkUploadToS3(self): + rootPath = sc.clusterRootPath() + cmd = f"ls -l {rootPath}/dnode*/data/vnode/vnode*/tsdb/*.data" + tdLog.info(cmd) + loop = 0 + rets = [] + overCnt = 0 + while loop < 200: + time.sleep(3) + + # check upload to s3 + rets = eos.runRetList(cmd) + cnt = len(rets) + if cnt == 0: + overCnt = 0 + tdLog.info("All data file upload to server over.") + break + overCnt = self.checkDataFile(rets, self.maxFileSize) + if overCnt == 0: + uploadOK = True + tdLog.info(f"All data files({len(rets)}) size bellow {self.maxFileSize}, check upload to s3 ok.") + break + + tdLog.info(f"loop={loop} no upload {overCnt} data files wait 3s retry ...") + if loop == 3: + sc.dnodeStop(1) + time.sleep(2) + sc.dnodeStart(1) + loop += 1 + # migrate + self.migrateDbS3() + + # check can pass + if overCnt > 0: + tdLog.exit(f"s3 have {overCnt} files over size.") + + def doAction(self): + tdLog.info(f"do action.") + + self.flushDb(show=True) + # self.compactDb(show=True) + + # sleep 70s + self.migrateDbS3() + + # check upload to s3 + self.checkUploadToS3() + + def check_azure_exist(self): + result = self.azure_class.blob_list() + if not self.fileName: + tdLog.exit("cannot find S3 file") + datafile = self.fileName.split("/")[-1].split(".")[0] + for blob in result["Blobs"]["Blob"]: + if datafile in blob["Name"]: + tdLog.info("Successfully found the file %s in Azure" % self.fileName) + break + else: + tdLog.exit("failed found the file %s in Azure" % self.fileName) + + def check_azure_not_exist(self): + datafile = self.fileName.split("/")[-1].split(".")[0] + + result = self.azure_class.blob_list() + for blob in result["Blobs"]["Blob"]: + if datafile in blob["Name"]: + tdLog.exit("failed delete the file %s in Azure" % self.fileName) + break + else: + tdLog.info("Successfully delete the file %s in Azure" % self.fileName) + + def checkStreamCorrect(self): + sql = f"select count(*) from {self.db}.stm1" + count = 0 + for i in range(120): + tdSql.query(sql) + count = tdSql.getData(0, 0) + if count == 100000 or count == 100001: + return True + time.sleep(1) + + tdLog.exit(f"stream count is not expect . expect = 100000 or 100001 real={count} . sql={sql}") + + def checkCreateDb(self, keepLocal, chunkSize, compact): + # keyword + kw1 = kw2 = kw3 = "" + if keepLocal is not None: + kw1 = f"s3_keeplocal {keepLocal}" + if chunkSize is not None: + kw2 = f"s3_chunksize {chunkSize}" + if compact is not None: + kw3 = f"s3_compact {compact}" + + sql = f" create database db1 vgroups 1 duration 1h {kw1} {kw2} {kw3}" + tdSql.execute(sql, show=True) + # sql = f"select name,s3_keeplocal,s3_chunksize,s3_compact from information_schema.ins_databases where name='db1';" + sql = f"select * from information_schema.ins_databases where name='db1';" + tdSql.query(sql) + # 29 30 31 -> chunksize keeplocal compact + if chunkSize is not None: + tdSql.checkData(0, 29, chunkSize) + if keepLocal is not None: + keepLocalm = keepLocal * 24 * 60 + tdSql.checkData(0, 30, f"{keepLocalm}m") + if compact is not None: + tdSql.checkData(0, 31, compact) + sql = "drop database db1" + tdSql.execute(sql) + + def checkExcept(self): + # errors + sqls = [ + f"create database db2 s3_keeplocal -1", + f"create database db2 s3_keeplocal 0", + f"create database db2 s3_keeplocal 365001", + f"create database db2 s3_chunksize -1", + f"create database db2 s3_chunksize 0", + f"create database db2 s3_chunksize 900000000", + f"create database db2 s3_compact -1", + f"create database db2 s3_compact 100", + f"create database db2 duration 1d s3_keeplocal 1d" + ] + tdSql.errors(sqls) + + def checkBasic(self): + # create db + keeps = [1, 256, 1024, 365000, None] + chunks = [131072, 600000, 820000, 1048576, None] + comps = [0, 1, None] + + for keep in keeps: + for chunk in chunks: + for comp in comps: + self.checkCreateDb(keep, chunk, comp) + + # --checks3 + idx = 1 + taosd = sc.taosdFile(idx) + cfg = sc.dnodeCfgPath(idx) + cmd = f"{taosd} -c {cfg} --checks3" + + eos.exe(cmd) + # output, error = eos.run(cmd) + # print(lines) + + ''' + tips = [ + "put object s3test.txt: success", + "listing bucket ci-bucket: success", + "get object s3test.txt: success", + "delete object s3test.txt: success" + ] + pos = 0 + for tip in tips: + pos = output.find(tip, pos) + #if pos == -1: + # tdLog.exit(f"checks3 failed not found {tip}. cmd={cmd} output={output}") + ''' + + # except + self.checkExcept() + + # + def preDb(self, vgroups): + cnt = int(time.time()) % 2 + 1 + for i in range(cnt): + vg = eutil.cpuRand(9) + 1 + sql = f"create database predb vgroups {vg}" + tdSql.execute(sql, show=True) + sql = "drop database predb" + tdSql.execute(sql, show=True) + + # history + def insertHistory(self): + tdLog.info(f"insert history data.") + # taosBenchmark run + json = etool.curFile(__file__, "s3Basic1.json") + etool.benchMark(json=json) + + # come from s3_basic.json + self.insert_rows += self.insert_rows / 4 + self.timestamp_step = 50 + + # delete + def checkDelete(self): + # del 1000 rows + start = 1600000000000 + drows = 200 + for i in range(1, drows, 2): + sql = f"from {self.db}.{self.stb} where ts = {start + i * 500}" + tdSql.execute("delete " + sql, show=True) + tdSql.query("select * " + sql) + tdSql.checkRows(0) + + # delete all 500 step + self.flushDb() + self.compactDb() + self.insert_rows -= drows / 2 + sql = f"select count(*) from {self.db}.{self.stb}" + tdSql.checkAgg(sql, self.insert_rows * self.childtable_count) + + # delete 10W rows from 100000 + drows = 100000 + sdel = start + 100000 * self.timestamp_step + edel = start + 100000 * self.timestamp_step + drows * self.timestamp_step + sql = f"from {self.db}.{self.stb} where ts >= {sdel} and ts < {edel}" + tdSql.execute("delete " + sql, show=True) + tdSql.query("select * " + sql) + tdSql.checkRows(0) + + self.insert_rows -= drows + sql = f"select count(*) from {self.db}.{self.stb}" + tdSql.checkAgg(sql, self.insert_rows * self.childtable_count) + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + self.sname = "stream1" + if eos.isArm64Cpu(): + tdLog.success(f"{__file__} arm64 ignore executed") + else: + + self.preDb(10) + + # insert data + self.insertData() + + # creat stream + self.createStream(self.sname) + + # check insert data correct + # self.checkInsertCorrect() + + # save + self.snapshotAgg() + + # do action + self.doAction() + # check azure data exist + self.check_azure_exist() + # check save agg result correct + self.checkAggCorrect() + + # check insert correct again + self.checkInsertCorrect() + + # check stream correct and drop stream + # self.checkStreamCorrect() + + # drop stream + self.dropStream(self.sname) + + # # insert history disorder data + # self.insertHistory() + # + # # checkBasic + # self.checkBasic() + + # self.checkInsertCorrect() + # self.snapshotAgg() + # self.doAction() + # self.checkAggCorrect() + # self.checkInsertCorrect(difCnt=self.childtable_count * 1499999) + # self.checkDelete() + # self.doAction() + + # drop database and free s3 file + self.dropDb() + # check azure data not exist + self.check_azure_not_exist() + + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())