225 lines
7.6 KiB
Python
225 lines
7.6 KiB
Python
###################################################################
|
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
|
# All rights reserved.
|
|
#
|
|
# This file is proprietary and confidential to TAOS Technologies.
|
|
# No part of this file may be reproduced, stored, transmitted,
|
|
# disclosed or used in any form or by any means other than as
|
|
# expressly provided by the written permission from Jianhui Tao
|
|
#
|
|
###################################################################
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
sys.path.insert(0, os.getcwd())
|
|
from fabric import Connection
|
|
from util.sql import *
|
|
from util.log import *
|
|
import taos
|
|
import random
|
|
import threading
|
|
import logging
|
|
|
|
class Node:
|
|
def __init__(self, index, username, hostIP, hostName, password, homeDir):
|
|
self.index = index
|
|
self.username = username
|
|
self.hostIP = hostIP
|
|
self.hostName = hostName
|
|
self.homeDir = homeDir
|
|
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
|
|
|
|
def startTaosd(self):
|
|
try:
|
|
self.conn.run("sudo systemctl start taosd")
|
|
except Exception as e:
|
|
print("Start Taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def stopTaosd(self):
|
|
try:
|
|
self.conn.run("sudo systemctl stop taosd")
|
|
except Exception as e:
|
|
print("Stop Taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def restartTaosd(self):
|
|
try:
|
|
self.conn.run("sudo systemctl restart taosd")
|
|
except Exception as e:
|
|
print("Stop Taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def removeTaosd(self):
|
|
try:
|
|
self.conn.run("rmtaos")
|
|
except Exception as e:
|
|
print("remove taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def installTaosd(self, packagePath):
|
|
self.conn.put(packagePath, self.homeDir)
|
|
self.conn.cd(self.homeDir)
|
|
self.conn.run("tar -zxf $(basename '%s')" % packagePath)
|
|
with self.conn.cd("TDengine-enterprise-server"):
|
|
self.conn.run("yes|./install.sh")
|
|
|
|
def configTaosd(self, taosConfigKey, taosConfigValue):
|
|
self.conn.run("sudo echo '%s %s' >> %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg"))
|
|
|
|
def removeTaosConfig(self, taosConfigKey, taosConfigValue):
|
|
self.conn.run("sudo sed -in-place -e '/%s %s/d' %s" % (taosConfigKey, taosConfigValue, "/etc/taos/taos.cfg"))
|
|
|
|
def configHosts(self, ip, name):
|
|
self.conn.run("echo '%s %s' >> %s" % (ip, name, '/etc/hosts'))
|
|
|
|
def removeData(self):
|
|
try:
|
|
self.conn.run("sudo rm -rf /var/lib/taos/*")
|
|
except Exception as e:
|
|
print("remove taosd data error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def removeLog(self):
|
|
try:
|
|
self.conn.run("sudo rm -rf /var/log/taos/*")
|
|
except Exception as e:
|
|
print("remove taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def removeDataForMnode(self):
|
|
try:
|
|
self.conn.run("sudo rm -rf /var/lib/taos/*")
|
|
except Exception as e:
|
|
print("remove taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
def removeDataForVnode(self, id):
|
|
try:
|
|
self.conn.run("sudo rm -rf /var/lib/taos/vnode%d/*.data" % id)
|
|
except Exception as e:
|
|
print("remove taosd error for node %d " % self.index)
|
|
logging.exception(e)
|
|
|
|
class Nodes:
|
|
def __init__(self):
|
|
self.node1 = Node(1, 'ubuntu', '192.168.1.52', 'node1', 'tbase125!', '/home/ubuntu')
|
|
self.node2 = Node(2, 'ubuntu', '192.168.1.53', 'node2', 'tbase125!', '/home/ubuntu')
|
|
self.node3 = Node(3, 'ubuntu', '192.168.1.54', 'node3', 'tbase125!', '/home/ubuntu')
|
|
|
|
def stopAllTaosd(self):
|
|
self.node1.stopTaosd()
|
|
self.node2.stopTaosd()
|
|
self.node3.stopTaosd()
|
|
|
|
def startAllTaosd(self):
|
|
self.node1.startTaosd()
|
|
self.node2.startTaosd()
|
|
self.node3.startTaosd()
|
|
|
|
def restartAllTaosd(self):
|
|
self.node1.restartTaosd()
|
|
self.node2.restartTaosd()
|
|
self.node3.restartTaosd()
|
|
|
|
def addConfigs(self, configKey, configValue):
|
|
self.node1.configTaosd(configKey, configValue)
|
|
self.node2.configTaosd(configKey, configValue)
|
|
self.node3.configTaosd(configKey, configValue)
|
|
|
|
def removeConfigs(self, configKey, configValue):
|
|
self.node1.removeTaosConfig(configKey, configValue)
|
|
self.node2.removeTaosConfig(configKey, configValue)
|
|
self.node3.removeTaosConfig(configKey, configValue)
|
|
|
|
def removeAllDataFiles(self):
|
|
self.node1.removeData()
|
|
self.node2.removeData()
|
|
self.node3.removeData()
|
|
|
|
class ClusterTest:
|
|
def __init__(self, hostName):
|
|
self.host = hostName
|
|
self.user = "root"
|
|
self.password = "taosdata"
|
|
self.config = "/etc/taos"
|
|
self.dbName = "mytest"
|
|
self.stbName = "meters"
|
|
self.numberOfThreads = 20
|
|
self.numberOfTables = 10000
|
|
self.numberOfRecords = 1000
|
|
self.tbPrefix = "t"
|
|
self.ts = 1538548685000
|
|
self.repeat = 1
|
|
|
|
def connectDB(self):
|
|
self.conn = taos.connect(
|
|
host=self.host,
|
|
user=self.user,
|
|
password=self.password,
|
|
config=self.config)
|
|
|
|
def createSTable(self, replica):
|
|
cursor = self.conn.cursor()
|
|
tdLog.info("drop database if exists %s" % self.dbName)
|
|
cursor.execute("drop database if exists %s" % self.dbName)
|
|
tdLog.info("create database %s replica %d" % (self.dbName, replica))
|
|
cursor.execute("create database %s replica %d" % (self.dbName, replica))
|
|
tdLog.info("use %s" % self.dbName)
|
|
cursor.execute("use %s" % self.dbName)
|
|
tdLog.info("drop table if exists %s" % self.stbName)
|
|
cursor.execute("drop table if exists %s" % self.stbName)
|
|
tdLog.info("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName)
|
|
cursor.execute("create table %s(ts timestamp, current float, voltage int, phase int) tags(id int)" % self.stbName)
|
|
cursor.close()
|
|
|
|
def insertData(self, threadID):
|
|
print("Thread %d: starting" % threadID)
|
|
cursor = self.conn.cursor()
|
|
tablesPerThread = int(self.numberOfTables / self.numberOfThreads)
|
|
baseTableID = tablesPerThread * threadID
|
|
for i in range (tablesPerThread):
|
|
cursor.execute("create table %s%d using %s tags(%d)" % (self.tbPrefix, baseTableID + i, self.stbName, baseTableID + i))
|
|
query = "insert into %s%d values" % (self.tbPrefix, baseTableID + i)
|
|
base = self.numberOfRecords * i
|
|
for j in range(self.numberOfRecords):
|
|
query += "(%d, %f, %d, %d)" % (self.ts + base + j, random.random(), random.randint(210, 230), random.randint(0, 10))
|
|
cursor.execute(query)
|
|
cursor.close()
|
|
print("Thread %d: finishing" % threadID)
|
|
|
|
def run(self):
|
|
threads = []
|
|
tdLog.info("Inserting data")
|
|
for i in range(self.numberOfThreads):
|
|
thread = threading.Thread(target=self.insertData, args=(i,))
|
|
threads.append(thread)
|
|
thread.start()
|
|
|
|
for i in range(self.numberOfThreads):
|
|
threads[i].join()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|