Added initial code for the new perf_gen tool
This commit is contained in:
parent
456ea712b3
commit
1d7fd5aec4
|
@ -0,0 +1,480 @@
|
|||
#!/usr/bin/python3.8
|
||||
|
||||
from abc import abstractmethod
|
||||
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
|
||||
import argparse
|
||||
import textwrap
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import taos
|
||||
|
||||
from crash_gen.crash_gen_main import Database, TdSuperTable
|
||||
from crash_gen.service_manager import TdeInstance
|
||||
|
||||
from crash_gen.shared.config import Config
|
||||
from crash_gen.shared.db import DbConn
|
||||
from crash_gen.shared.misc import Dice, Logging, Helper
|
||||
from crash_gen.shared.types import TdDataType
|
||||
|
||||
|
||||
# NUM_PROCESSES = 10
|
||||
# NUM_REPS = 1000
|
||||
|
||||
tick = int(time.time() - 5000000.0) # for now we will create max 5M record
|
||||
value = 101
|
||||
|
||||
DB_NAME = 'mydb'
|
||||
TIME_SERIES_NAME = 'widget'
|
||||
|
||||
MAX_SHELF = 500 # shelf number runs up to this, non-inclusive
|
||||
ITEMS_PER_SHELF = 5
|
||||
BATCH_SIZE = 2000 # Number of data points per request
|
||||
|
||||
# None_RW:
|
||||
# INFLUX_TOKEN='RRzVQZs8ERCpV9cS2RXqgtM_Y6FEZuJ7Tuk0aHtZItFTfcM9ajixtGDhW8HzqNIBmG3hmztw-P4sHOstfJvjFA=='
|
||||
# DevOrg_RW:
|
||||
# INFLUX_TOKEN='o1P8sEhBmXKhxBmNuiCyOUKv8d7qm5wUjMff9AbskBu2LcmNPQzU77NrAn5hDil8hZ0-y1AGWpzpL-4wqjFdkA=='
|
||||
# DevOrg_All_Access
|
||||
INFLUX_TOKEN='T2QTr4sloJhINH_oSrwSS-WIIZYjDfD123NK4ou3b7ajRs0c0IphCh3bNc0OsDZQRW1HyCby7opdEndVYFGTWQ=='
|
||||
INFLUX_ORG="DevOrg"
|
||||
INFLUX_BUCKET="Bucket01"
|
||||
|
||||
def writeTaosBatch(dbc, tblName):
|
||||
# Database.setupLastTick()
|
||||
global value, tick
|
||||
|
||||
data = []
|
||||
for i in range(0, 100):
|
||||
data.append("('{}', {})".format(Database.getNextTick(), value) )
|
||||
value += 1
|
||||
|
||||
sql = "INSERT INTO {} VALUES {}".format(tblName, ''.join(data))
|
||||
dbc.execute(sql)
|
||||
|
||||
class PerfGenError(taos.error.ProgrammingError):
|
||||
pass
|
||||
|
||||
class Benchmark():
|
||||
|
||||
# @classmethod
|
||||
# def create(cls, dbType):
|
||||
# if dbType == 'taos':
|
||||
# return TaosBenchmark()
|
||||
# elif dbType == 'influx':
|
||||
# return InfluxBenchmark()
|
||||
# else:
|
||||
# raise RuntimeError("Unknown DB type: {}".format(dbType))
|
||||
|
||||
def __init__(self, dbType, loopCount = 0):
|
||||
self._dbType = dbType
|
||||
self._setLoopCount(loopCount)
|
||||
|
||||
def _setLoopCount(self, loopCount):
|
||||
cfgLoopCount = Config.getConfig().loop_count
|
||||
if loopCount == 0: # use config
|
||||
self._loopCount = cfgLoopCount
|
||||
else:
|
||||
if cfgLoopCount :
|
||||
Logging.warning("Ignoring loop count for fixed-loop-count benchmarks: {}".format(cfgLoopCount))
|
||||
self._loopCount = loopCount
|
||||
|
||||
@abstractmethod
|
||||
def doIterate(self):
|
||||
'''
|
||||
Execute the benchmark directly, without invoking sub processes,
|
||||
effectively using one execution thread.
|
||||
'''
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def prepare(self):
|
||||
'''
|
||||
Preparation needed to run a certain benchmark
|
||||
'''
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def execute(self):
|
||||
'''
|
||||
Actually execute the benchmark
|
||||
'''
|
||||
Logging.warning("Unexpected execution")
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return self.__class__.__name__
|
||||
|
||||
def run(self):
|
||||
print("Running benchmark: {}, class={} ...".format(self.name, self.__class__))
|
||||
startTime = time.time()
|
||||
|
||||
# Prepare to execute the benchmark
|
||||
self.prepare()
|
||||
|
||||
# Actually execute the benchmark
|
||||
self.execute()
|
||||
|
||||
# if Config.getConfig().iterate_directly: # execute directly
|
||||
# Logging.debug("Iterating...")
|
||||
# self.doIterate()
|
||||
# else:
|
||||
# Logging.debug("Executing via sub process...")
|
||||
# startTime = time.time()
|
||||
# self.prepare()
|
||||
# self.spawnProcesses()
|
||||
# self.waitForProcecess()
|
||||
# duration = time.time() - startTime
|
||||
# Logging.info("Benchmark execution completed in {:.3f} seconds".format(duration))
|
||||
Logging.info("Benchmark {} finished in {:.3f} seconds".format(
|
||||
self.name, time.time()-startTime))
|
||||
|
||||
def spawnProcesses(self):
|
||||
self._subProcs = []
|
||||
for j in range(0, Config.getConfig().subprocess_count):
|
||||
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||
tblName = 'cars_reg_{}'.format(j)
|
||||
cmdLineStr = './perf_gen.sh -t {} -i -n {} -l {}'.format(
|
||||
self._dbType,
|
||||
tblName,
|
||||
Config.getConfig().loop_count
|
||||
)
|
||||
if Config.getConfig().debug:
|
||||
cmdLineStr += ' -d'
|
||||
subProc = subprocess.Popen(cmdLineStr,
|
||||
shell = True,
|
||||
close_fds = ON_POSIX)
|
||||
self._subProcs.append(subProc)
|
||||
|
||||
def waitForProcecess(self):
|
||||
for sp in self._subProcs:
|
||||
sp.wait(300)
|
||||
|
||||
|
||||
class TaosBenchmark(Benchmark):
|
||||
|
||||
def __init__(self, loopCount):
|
||||
super().__init__('taos', loopCount)
|
||||
# self._dbType = 'taos'
|
||||
tInst = TdeInstance()
|
||||
self._dbc = DbConn.createNative(tInst.getDbTarget())
|
||||
self._dbc.open()
|
||||
self._sTable = TdSuperTable(TIME_SERIES_NAME + '_s', DB_NAME)
|
||||
|
||||
def doIterate(self):
|
||||
tblName = Config.getConfig().target_table_name
|
||||
print("Benchmarking TAOS database (1 pass) for: {}".format(tblName))
|
||||
self._dbc.execute("USE {}".format(DB_NAME))
|
||||
|
||||
self._sTable.ensureRegTable(None, self._dbc, tblName)
|
||||
try:
|
||||
lCount = Config.getConfig().loop_count
|
||||
print("({})".format(lCount))
|
||||
for i in range(0, lCount):
|
||||
writeTaosBatch(self._dbc, tblName)
|
||||
except taos.error.ProgrammingError as err:
|
||||
Logging.error("Failed to write batch")
|
||||
|
||||
def prepare(self):
|
||||
self._dbc.execute("CREATE DATABASE IF NOT EXISTS {}".format(DB_NAME))
|
||||
self._dbc.execute("USE {}".format(DB_NAME))
|
||||
# Create the super table
|
||||
self._sTable.drop(self._dbc, True)
|
||||
self._sTable.create(self._dbc,
|
||||
{'ts': TdDataType.TIMESTAMP,
|
||||
'temperature': TdDataType.INT,
|
||||
'pressure': TdDataType.INT,
|
||||
'notes': TdDataType.BINARY200
|
||||
},
|
||||
{'rack': TdDataType.INT,
|
||||
'shelf': TdDataType.INT,
|
||||
'barcode': TdDataType.BINARY16
|
||||
})
|
||||
|
||||
def execSql(self, sql):
|
||||
try:
|
||||
self._dbc.execute(sql)
|
||||
except taos.error.ProgrammingError as err:
|
||||
Logging.warning("SQL Error: 0x{:X}, {}, SQL: {}".format(
|
||||
Helper.convertErrno(err.errno), err.msg, sql))
|
||||
raise
|
||||
|
||||
def executeWrite(self):
|
||||
# Sample: INSERT INTO t1 USING st TAGS(1) VALUES(now, 1) t2 USING st TAGS(2) VALUES(now, 2)
|
||||
sqlPrefix = "INSERT INTO "
|
||||
dataTemplate = "{} USING {} TAGS({},{},'barcode_{}') VALUES('{}',{},{},'{}') "
|
||||
|
||||
stName = self._sTable.getName()
|
||||
BATCH_SIZE = 2000 # number of items per request batch
|
||||
ITEMS_PER_SHELF = 5
|
||||
|
||||
# rackSize = 10 # shelves per rack
|
||||
# shelfSize = 100 # items per shelf
|
||||
batchCount = self._loopCount // BATCH_SIZE
|
||||
lastRack = 0
|
||||
for i in range(batchCount):
|
||||
sql = sqlPrefix
|
||||
for j in range(BATCH_SIZE):
|
||||
n = i*BATCH_SIZE + j # serial number
|
||||
# values first
|
||||
# rtName = 'rt_' + str(n) # table name contains serial number, has info
|
||||
temperature = 20 + (n % 10)
|
||||
pressure = 70 + (n % 10)
|
||||
# tags
|
||||
shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
|
||||
rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
|
||||
barcode = rack + shelf
|
||||
# table name
|
||||
tableName = "reg_" + str(rack) + '_' + str(shelf)
|
||||
# now the SQL
|
||||
sql += dataTemplate.format(tableName, stName,# table name
|
||||
rack, shelf, barcode, # tags
|
||||
Database.getNextTick(), temperature, pressure, 'xxx') # values
|
||||
lastRack = rack
|
||||
self.execSql(sql)
|
||||
Logging.info("Last Rack: {}".format(lastRack))
|
||||
|
||||
class TaosWriteBenchmark(TaosBenchmark):
|
||||
def execute(self):
|
||||
self.executeWrite()
|
||||
|
||||
class Taos100kWriteBenchmark(TaosWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(100*1000)
|
||||
|
||||
class Taos10kWriteBenchmark(TaosWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(10*1000)
|
||||
|
||||
class Taos1mWriteBenchmark(TaosWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(1000*1000)
|
||||
|
||||
class Taos5mWriteBenchmark(TaosWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(5*1000*1000)
|
||||
|
||||
class Taos1kQueryBenchmark(TaosBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(1000)
|
||||
|
||||
class Taos1MCreationBenchmark(TaosBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(1000000)
|
||||
|
||||
|
||||
class InfluxBenchmark(Benchmark):
|
||||
def __init__(self, loopCount):
|
||||
super().__init__('influx', loopCount)
|
||||
# self._dbType = 'influx'
|
||||
|
||||
|
||||
# self._client = InfluxDBClient(host='localhost', port=8086)
|
||||
|
||||
# def _writeBatch(self, tblName):
|
||||
# global value, tick
|
||||
# data = []
|
||||
# for i in range(0, 100):
|
||||
# line = "{},device={} value={} {}".format(
|
||||
# TIME_SERIES_NAME,
|
||||
# tblName,
|
||||
# value,
|
||||
# tick*1000000000)
|
||||
# # print(line)
|
||||
# data.append(line)
|
||||
# value += 1
|
||||
# tick +=1
|
||||
|
||||
# self._client.write(data, {'db':DB_NAME}, protocol='line')
|
||||
|
||||
def executeWrite(self):
|
||||
global tick # influx tick #TODO refactor
|
||||
|
||||
lineTemplate = TIME_SERIES_NAME + ",rack={},shelf={},barcode='barcode_{}' temperature={},pressure={} {}"
|
||||
|
||||
batchCount = self._loopCount // BATCH_SIZE
|
||||
for i in range(batchCount):
|
||||
lineBatch = []
|
||||
for j in range(BATCH_SIZE):
|
||||
n = i*BATCH_SIZE + j # serial number
|
||||
# values first
|
||||
# rtName = 'rt_' + str(n) # table name contains serial number, has info
|
||||
temperature = 20 + (n % 10)
|
||||
pressure = 70 + (n % 10)
|
||||
# tags
|
||||
shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
|
||||
rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
|
||||
barcode = rack + shelf
|
||||
# now the SQL
|
||||
line = lineTemplate.format(
|
||||
rack, shelf, barcode, # tags
|
||||
temperature, pressure, # values
|
||||
tick * 1000000000 )
|
||||
tick += 1
|
||||
lineBatch.append(line)
|
||||
write_api = self._client.write_api(write_options=SYNCHRONOUS)
|
||||
write_api.write(INFLUX_BUCKET, INFLUX_ORG, lineBatch)
|
||||
# self._client.write(lineBatch, {'db':DB_NAME}, protocol='line')
|
||||
|
||||
# def doIterate(self):
|
||||
# tblName = Config.getConfig().target_table_name
|
||||
# print("Benchmarking INFLUX database (1 pass) for: {}".format(tblName))
|
||||
|
||||
# for i in range(0, Config.getConfig().loop_count):
|
||||
# self._writeBatch(tblName)
|
||||
|
||||
def _getOrgIdByName(self, orgName):
|
||||
"""Find org by name.
|
||||
|
||||
"""
|
||||
orgApi = self._client.organizations_api()
|
||||
orgs = orgApi.find_organizations()
|
||||
for org in orgs:
|
||||
if org.name == orgName:
|
||||
return org.id
|
||||
raise PerfGenError("Org not found with name: {}".format(orgName))
|
||||
|
||||
def _fetchAuth(self):
|
||||
authApi = self._client.authorizations_api()
|
||||
auths = authApi.find_authorizations()
|
||||
for auth in auths:
|
||||
if auth.token == INFLUX_TOKEN :
|
||||
return auth
|
||||
raise PerfGenError("No proper auth found")
|
||||
|
||||
def _verifyPermissions(self, perms: list):
|
||||
if list:
|
||||
return #OK
|
||||
raise PerfGenError("No permission found")
|
||||
|
||||
def prepare(self):
|
||||
self._client = InfluxDBClient(
|
||||
url="http://127.0.0.1:8086",
|
||||
token=INFLUX_TOKEN,
|
||||
org=INFLUX_ORG)
|
||||
|
||||
auth = self._fetchAuth()
|
||||
|
||||
self._verifyPermissions(auth.permissions)
|
||||
|
||||
bktApi = self._client.buckets_api()
|
||||
# Delete
|
||||
bkt = bktApi.find_bucket_by_name(INFLUX_BUCKET)
|
||||
if bkt:
|
||||
bktApi.delete_bucket(bkt)
|
||||
# Recreate
|
||||
|
||||
orgId = self._getOrgIdByName(INFLUX_ORG)
|
||||
bktApi.create_bucket(bucket=None, bucket_name=INFLUX_BUCKET, org_id=orgId)
|
||||
|
||||
# self._client.drop_database(DB_NAME)
|
||||
# self._client.create_database(DB_NAME)
|
||||
# self._client.switch_database(DB_NAME)
|
||||
|
||||
class InfluxWriteBenchmark(InfluxBenchmark):
|
||||
def execute(self):
|
||||
return self.executeWrite()
|
||||
|
||||
class Influx10kWriteBenchmark(InfluxWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(10*1000)
|
||||
|
||||
class Influx100kWriteBenchmark(InfluxWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(100*1000)
|
||||
|
||||
class Influx1mWriteBenchmark(InfluxWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(1000*1000)
|
||||
|
||||
class Influx5mWriteBenchmark(InfluxWriteBenchmark):
|
||||
def __init__(self):
|
||||
super().__init__(5*1000*1000)
|
||||
|
||||
def _buildCmdLineParser():
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description=textwrap.dedent('''\
|
||||
TDengine Performance Benchmarking Tool
|
||||
---------------------------------------------------------------------
|
||||
|
||||
'''))
|
||||
|
||||
parser.add_argument(
|
||||
'-b',
|
||||
'--benchmark-name',
|
||||
action='store',
|
||||
default='Taos1kQuery',
|
||||
type=str,
|
||||
help='Benchmark to use (default: Taos1kQuery)')
|
||||
|
||||
parser.add_argument(
|
||||
'-d',
|
||||
'--debug',
|
||||
action='store_true',
|
||||
help='Turn on DEBUG mode for more logging (default: false)')
|
||||
|
||||
parser.add_argument(
|
||||
'-i',
|
||||
'--iterate-directly',
|
||||
action='store_true',
|
||||
help='Execution operations directly without sub-process (default: false)')
|
||||
|
||||
parser.add_argument(
|
||||
'-l',
|
||||
'--loop-count',
|
||||
action='store',
|
||||
default=1000,
|
||||
type=int,
|
||||
help='Number of loops to perform, 100 operations per loop. (default: 1000)')
|
||||
|
||||
parser.add_argument(
|
||||
'-n',
|
||||
'--target-table-name',
|
||||
action='store',
|
||||
default=None,
|
||||
type=str,
|
||||
help='Regular table name in target DB (default: None)')
|
||||
|
||||
parser.add_argument(
|
||||
'-s',
|
||||
'--subprocess-count',
|
||||
action='store',
|
||||
default=4,
|
||||
type=int,
|
||||
help='Number of sub processes to spawn. (default: 10)')
|
||||
|
||||
parser.add_argument(
|
||||
'-t',
|
||||
'--target-database',
|
||||
action='store',
|
||||
default='taos',
|
||||
type=str,
|
||||
help='Benchmark target: taos, influx (default: taos)')
|
||||
|
||||
return parser
|
||||
|
||||
def main():
|
||||
parser = _buildCmdLineParser()
|
||||
Config.init(parser)
|
||||
Logging.clsInit(Config.getConfig().debug)
|
||||
Dice.seed(0) # initial seeding of dice
|
||||
|
||||
bName = Config.getConfig().benchmark_name
|
||||
bClass = globals()[bName + 'Benchmark']
|
||||
bm = bClass() # Benchmark object
|
||||
bm.run()
|
||||
|
||||
# bm = Benchmark.create(Config.getConfig().target_database)
|
||||
# bm.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
#!/bin/bash
|
||||
|
||||
# This is the script for us to try to cause the TDengine server or client to crash
|
||||
#
|
||||
# PREPARATION
|
||||
#
|
||||
# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree
|
||||
# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory
|
||||
# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg
|
||||
# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg
|
||||
# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above
|
||||
# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils
|
||||
#
|
||||
# RUNNING THIS SCRIPT
|
||||
#
|
||||
# This script assumes the source code directory is intact, and that the binaries has been built in the
|
||||
# build/ directory, as such, will will load the Python libraries in the directory tree, and also load
|
||||
# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env
|
||||
# variables below.
|
||||
#
|
||||
# Running the script is simple, no parameter is needed (for now, but will change in the future).
|
||||
#
|
||||
# Happy Crashing...
|
||||
|
||||
|
||||
# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory
|
||||
EXEC_DIR=`dirname "$0"`
|
||||
if [[ $EXEC_DIR != "." ]]
|
||||
then
|
||||
echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)"
|
||||
exit -1
|
||||
fi
|
||||
|
||||
CURR_DIR=`pwd`
|
||||
IN_TDINTERNAL="community"
|
||||
if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then
|
||||
TAOS_DIR=$CURR_DIR/../../..
|
||||
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
|
||||
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6,7|rev`/lib
|
||||
else
|
||||
TAOS_DIR=$CURR_DIR/../..
|
||||
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
|
||||
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
|
||||
fi
|
||||
|
||||
# Now getting ready to execute Python
|
||||
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
|
||||
PYTHON_EXEC=python3.8
|
||||
|
||||
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
|
||||
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
|
||||
|
||||
# Then let us set up the library path so that our compiled SO file can be loaded by Python
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
|
||||
|
||||
# Now we are all let, and let's see if we can find a crash. Note we pass all params
|
||||
PERF_GEN_EXEC=perf_gen.py
|
||||
$PYTHON_EXEC $PERF_GEN_EXEC $@
|
||||
|
||||
|
Loading…
Reference in New Issue