Merge pull request #6079 from taosdata/feature/crash_gen2
Data generation script for TD-4133
This commit is contained in:
commit
545749c003
|
@ -1,6 +1,7 @@
|
||||||
|
|
||||||
from .connection import TDengineConnection
|
from .connection import TDengineConnection
|
||||||
from .cursor import TDengineCursor
|
from .cursor import TDengineCursor
|
||||||
|
from .error import Error
|
||||||
|
|
||||||
# Globals
|
# Globals
|
||||||
threadsafety = 0
|
threadsafety = 0
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
# Helpful Ref: https://stackoverflow.com/questions/24100558/how-can-i-split-a-module-into-multiple-files-without-breaking-a-backwards-compa/24100645
|
||||||
|
from crash_gen.service_manager import ServiceManager, TdeInstance, TdeSubProcess
|
|
@ -1,6 +1,6 @@
|
||||||
# -----!/usr/bin/python3.7
|
# -----!/usr/bin/python3.7
|
||||||
###################################################################
|
###################################################################
|
||||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
# Copyright (c) 2016-2021 by TAOS Technologies, Inc.
|
||||||
# All rights reserved.
|
# All rights reserved.
|
||||||
#
|
#
|
||||||
# This file is proprietary and confidential to TAOS Technologies.
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
@ -15,7 +15,7 @@
|
||||||
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
|
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Set
|
from typing import Any, Set, Tuple
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from typing import List
|
from typing import List
|
||||||
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
|
from typing import Optional # Type hinting, ref: https://stackoverflow.com/questions/19202633/python-3-type-hinting-for-none
|
||||||
|
@ -24,29 +24,34 @@ import textwrap
|
||||||
import time
|
import time
|
||||||
import datetime
|
import datetime
|
||||||
import random
|
import random
|
||||||
import logging
|
|
||||||
import threading
|
import threading
|
||||||
import copy
|
|
||||||
import argparse
|
import argparse
|
||||||
import getopt
|
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
import io
|
||||||
import signal
|
import signal
|
||||||
import traceback
|
import traceback
|
||||||
import resource
|
import requests
|
||||||
# from guppy import hpy
|
# from guppy import hpy
|
||||||
import gc
|
import gc
|
||||||
|
|
||||||
from crash_gen.service_manager import ServiceManager, TdeInstance
|
|
||||||
from crash_gen.misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
|
||||||
from crash_gen.db import DbConn, MyTDSql, DbConnNative, DbManager
|
|
||||||
import crash_gen.settings
|
|
||||||
|
|
||||||
import taos
|
import taos
|
||||||
import requests
|
|
||||||
|
|
||||||
crash_gen.settings.init()
|
from .shared.types import TdColumns, TdTags
|
||||||
|
|
||||||
|
# from crash_gen import ServiceManager, TdeInstance, TdeSubProcess
|
||||||
|
# from crash_gen import ServiceManager, Config, DbConn, DbConnNative, Dice, DbManager, Status, Logging, Helper, \
|
||||||
|
# CrashGenError, Progress, MyTDSql, \
|
||||||
|
# TdeInstance
|
||||||
|
|
||||||
|
from .service_manager import ServiceManager, TdeInstance
|
||||||
|
|
||||||
|
from .shared.config import Config
|
||||||
|
from .shared.db import DbConn, DbManager, DbConnNative, MyTDSql
|
||||||
|
from .shared.misc import Dice, Logging, Helper, Status, CrashGenError, Progress
|
||||||
|
from .shared.types import TdDataType
|
||||||
|
|
||||||
|
# Config.init()
|
||||||
|
|
||||||
# Require Python 3
|
# Require Python 3
|
||||||
if sys.version_info[0] < 3:
|
if sys.version_info[0] < 3:
|
||||||
|
@ -56,8 +61,8 @@ if sys.version_info[0] < 3:
|
||||||
|
|
||||||
# Command-line/Environment Configurations, will set a bit later
|
# Command-line/Environment Configurations, will set a bit later
|
||||||
# ConfigNameSpace = argparse.Namespace
|
# ConfigNameSpace = argparse.Namespace
|
||||||
gConfig: argparse.Namespace
|
# gConfig: argparse.Namespace
|
||||||
gSvcMgr: ServiceManager # TODO: refactor this hack, use dep injection
|
gSvcMgr: Optional[ServiceManager] # TODO: refactor this hack, use dep injection
|
||||||
# logger: logging.Logger
|
# logger: logging.Logger
|
||||||
gContainer: Container
|
gContainer: Container
|
||||||
|
|
||||||
|
@ -80,20 +85,20 @@ class WorkerThread:
|
||||||
self._stepGate = threading.Event()
|
self._stepGate = threading.Event()
|
||||||
|
|
||||||
# Let us have a DB connection of our own
|
# Let us have a DB connection of our own
|
||||||
if (gConfig.per_thread_db_connection): # type: ignore
|
if (Config.getConfig().per_thread_db_connection): # type: ignore
|
||||||
# print("connector_type = {}".format(gConfig.connector_type))
|
# print("connector_type = {}".format(gConfig.connector_type))
|
||||||
tInst = gContainer.defTdeInstance
|
tInst = gContainer.defTdeInstance
|
||||||
if gConfig.connector_type == 'native':
|
if Config.getConfig().connector_type == 'native':
|
||||||
self._dbConn = DbConn.createNative(tInst.getDbTarget())
|
self._dbConn = DbConn.createNative(tInst.getDbTarget())
|
||||||
elif gConfig.connector_type == 'rest':
|
elif Config.getConfig().connector_type == 'rest':
|
||||||
self._dbConn = DbConn.createRest(tInst.getDbTarget())
|
self._dbConn = DbConn.createRest(tInst.getDbTarget())
|
||||||
elif gConfig.connector_type == 'mixed':
|
elif Config.getConfig().connector_type == 'mixed':
|
||||||
if Dice.throw(2) == 0: # 1/2 chance
|
if Dice.throw(2) == 0: # 1/2 chance
|
||||||
self._dbConn = DbConn.createNative()
|
self._dbConn = DbConn.createNative(tInst.getDbTarget())
|
||||||
else:
|
else:
|
||||||
self._dbConn = DbConn.createRest()
|
self._dbConn = DbConn.createRest(tInst.getDbTarget())
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
|
raise RuntimeError("Unexpected connector type: {}".format(Config.getConfig().connector_type))
|
||||||
|
|
||||||
# self._dbInUse = False # if "use db" was executed already
|
# self._dbInUse = False # if "use db" was executed already
|
||||||
|
|
||||||
|
@ -122,14 +127,14 @@ class WorkerThread:
|
||||||
# self.isSleeping = False
|
# self.isSleeping = False
|
||||||
Logging.info("Starting to run thread: {}".format(self._tid))
|
Logging.info("Starting to run thread: {}".format(self._tid))
|
||||||
|
|
||||||
if (gConfig.per_thread_db_connection): # type: ignore
|
if (Config.getConfig().per_thread_db_connection): # type: ignore
|
||||||
Logging.debug("Worker thread openning database connection")
|
Logging.debug("Worker thread openning database connection")
|
||||||
self._dbConn.open()
|
self._dbConn.open()
|
||||||
|
|
||||||
self._doTaskLoop()
|
self._doTaskLoop()
|
||||||
|
|
||||||
# clean up
|
# clean up
|
||||||
if (gConfig.per_thread_db_connection): # type: ignore
|
if (Config.getConfig().per_thread_db_connection): # type: ignore
|
||||||
if self._dbConn.isOpen: #sometimes it is not open
|
if self._dbConn.isOpen: #sometimes it is not open
|
||||||
self._dbConn.close()
|
self._dbConn.close()
|
||||||
else:
|
else:
|
||||||
|
@ -157,7 +162,7 @@ class WorkerThread:
|
||||||
|
|
||||||
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
|
# Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
|
||||||
try:
|
try:
|
||||||
if (gConfig.per_thread_db_connection): # most likely TRUE
|
if (Config.getConfig().per_thread_db_connection): # most likely TRUE
|
||||||
if not self._dbConn.isOpen: # might have been closed during server auto-restart
|
if not self._dbConn.isOpen: # might have been closed during server auto-restart
|
||||||
self._dbConn.open()
|
self._dbConn.open()
|
||||||
# self.useDb() # might encounter exceptions. TODO: catch
|
# self.useDb() # might encounter exceptions. TODO: catch
|
||||||
|
@ -231,7 +236,7 @@ class WorkerThread:
|
||||||
return self.getDbConn().getQueryResult()
|
return self.getDbConn().getQueryResult()
|
||||||
|
|
||||||
def getDbConn(self) -> DbConn :
|
def getDbConn(self) -> DbConn :
|
||||||
if (gConfig.per_thread_db_connection):
|
if (Config.getConfig().per_thread_db_connection):
|
||||||
return self._dbConn
|
return self._dbConn
|
||||||
else:
|
else:
|
||||||
return self._tc.getDbManager().getDbConn()
|
return self._tc.getDbManager().getDbConn()
|
||||||
|
@ -253,7 +258,7 @@ class ThreadCoordinator:
|
||||||
self._pool = pool
|
self._pool = pool
|
||||||
# self._wd = wd
|
# self._wd = wd
|
||||||
self._te = None # prepare for every new step
|
self._te = None # prepare for every new step
|
||||||
self._dbManager = dbManager
|
self._dbManager = dbManager # type: Optional[DbManager] # may be freed
|
||||||
self._executedTasks: List[Task] = [] # in a given step
|
self._executedTasks: List[Task] = [] # in a given step
|
||||||
self._lock = threading.RLock() # sync access for a few things
|
self._lock = threading.RLock() # sync access for a few things
|
||||||
|
|
||||||
|
@ -265,9 +270,13 @@ class ThreadCoordinator:
|
||||||
self._stepStartTime = None # Track how long it takes to execute each step
|
self._stepStartTime = None # Track how long it takes to execute each step
|
||||||
|
|
||||||
def getTaskExecutor(self):
|
def getTaskExecutor(self):
|
||||||
|
if self._te is None:
|
||||||
|
raise CrashGenError("Unexpected empty TE")
|
||||||
return self._te
|
return self._te
|
||||||
|
|
||||||
def getDbManager(self) -> DbManager:
|
def getDbManager(self) -> DbManager:
|
||||||
|
if self._dbManager is None:
|
||||||
|
raise ChildProcessError("Unexpected empty _dbManager")
|
||||||
return self._dbManager
|
return self._dbManager
|
||||||
|
|
||||||
def crossStepBarrier(self, timeout=None):
|
def crossStepBarrier(self, timeout=None):
|
||||||
|
@ -278,7 +287,7 @@ class ThreadCoordinator:
|
||||||
self._execStats.registerFailure("User Interruption")
|
self._execStats.registerFailure("User Interruption")
|
||||||
|
|
||||||
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
|
def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
|
||||||
maxSteps = gConfig.max_steps # type: ignore
|
maxSteps = Config.getConfig().max_steps # type: ignore
|
||||||
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
|
if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
|
||||||
return True
|
return True
|
||||||
if self._runStatus != Status.STATUS_RUNNING:
|
if self._runStatus != Status.STATUS_RUNNING:
|
||||||
|
@ -383,7 +392,7 @@ class ThreadCoordinator:
|
||||||
hasAbortedTask = False
|
hasAbortedTask = False
|
||||||
workerTimeout = False
|
workerTimeout = False
|
||||||
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
|
while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
|
||||||
if not gConfig.debug: # print this only if we are not in debug mode
|
if not Config.getConfig().debug: # print this only if we are not in debug mode
|
||||||
Progress.emit(Progress.STEP_BOUNDARY)
|
Progress.emit(Progress.STEP_BOUNDARY)
|
||||||
# print(".", end="", flush=True)
|
# print(".", end="", flush=True)
|
||||||
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps
|
# if (self._curStep % 2) == 0: # print memory usage once every 10 steps
|
||||||
|
@ -468,7 +477,7 @@ class ThreadCoordinator:
|
||||||
self._pool = None
|
self._pool = None
|
||||||
self._te = None
|
self._te = None
|
||||||
self._dbManager = None
|
self._dbManager = None
|
||||||
self._executedTasks = None
|
self._executedTasks = []
|
||||||
self._lock = None
|
self._lock = None
|
||||||
self._stepBarrier = None
|
self._stepBarrier = None
|
||||||
self._execStats = None
|
self._execStats = None
|
||||||
|
@ -507,18 +516,18 @@ class ThreadCoordinator:
|
||||||
''' Initialize multiple databases, invoked at __ini__() time '''
|
''' Initialize multiple databases, invoked at __ini__() time '''
|
||||||
self._dbs = [] # type: List[Database]
|
self._dbs = [] # type: List[Database]
|
||||||
dbc = self.getDbManager().getDbConn()
|
dbc = self.getDbManager().getDbConn()
|
||||||
if gConfig.max_dbs == 0:
|
if Config.getConfig().max_dbs == 0:
|
||||||
self._dbs.append(Database(0, dbc))
|
self._dbs.append(Database(0, dbc))
|
||||||
else:
|
else:
|
||||||
baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
|
baseDbNumber = int(datetime.datetime.now().timestamp( # Don't use Dice/random, as they are deterministic
|
||||||
)*333) % 888 if gConfig.dynamic_db_table_names else 0
|
)*333) % 888 if Config.getConfig().dynamic_db_table_names else 0
|
||||||
for i in range(gConfig.max_dbs):
|
for i in range(Config.getConfig().max_dbs):
|
||||||
self._dbs.append(Database(baseDbNumber + i, dbc))
|
self._dbs.append(Database(baseDbNumber + i, dbc))
|
||||||
|
|
||||||
def pickDatabase(self):
|
def pickDatabase(self):
|
||||||
idxDb = 0
|
idxDb = 0
|
||||||
if gConfig.max_dbs != 0 :
|
if Config.getConfig().max_dbs != 0 :
|
||||||
idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
|
idxDb = Dice.throw(Config.getConfig().max_dbs) # 0 to N-1
|
||||||
db = self._dbs[idxDb] # type: Database
|
db = self._dbs[idxDb] # type: Database
|
||||||
return db
|
return db
|
||||||
|
|
||||||
|
@ -562,7 +571,7 @@ class ThreadPool:
|
||||||
workerThread._thread.join()
|
workerThread._thread.join()
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.threadList = None # maybe clean up each?
|
self.threadList = [] # maybe clean up each?
|
||||||
|
|
||||||
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
|
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
|
||||||
# for new table names
|
# for new table names
|
||||||
|
@ -672,7 +681,7 @@ class AnyState:
|
||||||
|
|
||||||
# Each sub state tells us the "info", about itself, so we can determine
|
# Each sub state tells us the "info", about itself, so we can determine
|
||||||
# on things like canDropDB()
|
# on things like canDropDB()
|
||||||
def getInfo(self):
|
def getInfo(self) -> List[Any]:
|
||||||
raise RuntimeError("Must be overriden by child classes")
|
raise RuntimeError("Must be overriden by child classes")
|
||||||
|
|
||||||
def equals(self, other):
|
def equals(self, other):
|
||||||
|
@ -700,7 +709,7 @@ class AnyState:
|
||||||
def canDropDb(self):
|
def canDropDb(self):
|
||||||
# If user requests to run up to a number of DBs,
|
# If user requests to run up to a number of DBs,
|
||||||
# we'd then not do drop_db operations any more
|
# we'd then not do drop_db operations any more
|
||||||
if gConfig.max_dbs > 0 or gConfig.use_shadow_db :
|
if Config.getConfig().max_dbs > 0 or Config.getConfig().use_shadow_db :
|
||||||
return False
|
return False
|
||||||
return self._info[self.CAN_DROP_DB]
|
return self._info[self.CAN_DROP_DB]
|
||||||
|
|
||||||
|
@ -708,7 +717,7 @@ class AnyState:
|
||||||
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
|
return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
|
||||||
|
|
||||||
def canDropFixedSuperTable(self):
|
def canDropFixedSuperTable(self):
|
||||||
if gConfig.use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
|
if Config.getConfig().use_shadow_db: # duplicate writes to shaddow DB, in which case let's disable dropping s-table
|
||||||
return False
|
return False
|
||||||
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
|
||||||
|
|
||||||
|
@ -910,7 +919,7 @@ class StateMechine:
|
||||||
|
|
||||||
# May be slow, use cautionsly...
|
# May be slow, use cautionsly...
|
||||||
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
|
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
|
||||||
def typesToStrings(types):
|
def typesToStrings(types) -> List:
|
||||||
ss = []
|
ss = []
|
||||||
for t in types:
|
for t in types:
|
||||||
ss.append(t.__name__)
|
ss.append(t.__name__)
|
||||||
|
@ -1029,13 +1038,14 @@ class StateMechine:
|
||||||
|
|
||||||
# ref:
|
# ref:
|
||||||
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
# https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
||||||
def _weighted_choice_sub(self, weights):
|
def _weighted_choice_sub(self, weights) -> int:
|
||||||
# TODO: use our dice to ensure it being determinstic?
|
# TODO: use our dice to ensure it being determinstic?
|
||||||
rnd = random.random() * sum(weights)
|
rnd = random.random() * sum(weights)
|
||||||
for i, w in enumerate(weights):
|
for i, w in enumerate(weights):
|
||||||
rnd -= w
|
rnd -= w
|
||||||
if rnd < 0:
|
if rnd < 0:
|
||||||
return i
|
return i
|
||||||
|
raise CrashGenError("Unexpected no choice")
|
||||||
|
|
||||||
class Database:
|
class Database:
|
||||||
''' We use this to represent an actual TDengine database inside a service instance,
|
''' We use this to represent an actual TDengine database inside a service instance,
|
||||||
|
@ -1047,8 +1057,8 @@ class Database:
|
||||||
'''
|
'''
|
||||||
_clsLock = threading.Lock() # class wide lock
|
_clsLock = threading.Lock() # class wide lock
|
||||||
_lastInt = 101 # next one is initial integer
|
_lastInt = 101 # next one is initial integer
|
||||||
_lastTick = 0
|
_lastTick = None # Optional[datetime]
|
||||||
_lastLaggingTick = 0 # lagging tick, for out-of-sequence (oos) data insertions
|
_lastLaggingTick = None # Optional[datetime] # lagging tick, for out-of-sequence (oos) data insertions
|
||||||
|
|
||||||
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
|
def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
|
||||||
self._dbNum = dbNum # we assign a number to databases, for our testing purpose
|
self._dbNum = dbNum # we assign a number to databases, for our testing purpose
|
||||||
|
@ -1104,7 +1114,7 @@ class Database:
|
||||||
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
|
||||||
t4 = datetime.datetime.fromtimestamp(
|
t4 = datetime.datetime.fromtimestamp(
|
||||||
t3.timestamp() + elSec2) # see explanation above
|
t3.timestamp() + elSec2) # see explanation above
|
||||||
Logging.info("Setting up TICKS to start from: {}".format(t4))
|
Logging.debug("Setting up TICKS to start from: {}".format(t4))
|
||||||
return t4
|
return t4
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -1113,14 +1123,14 @@ class Database:
|
||||||
Fetch a timestamp tick, with some random factor, may not be unique.
|
Fetch a timestamp tick, with some random factor, may not be unique.
|
||||||
'''
|
'''
|
||||||
with cls._clsLock: # prevent duplicate tick
|
with cls._clsLock: # prevent duplicate tick
|
||||||
if cls._lastLaggingTick==0 or cls._lastTick==0 : # not initialized
|
if cls._lastLaggingTick is None or cls._lastTick is None : # not initialized
|
||||||
# 10k at 1/20 chance, should be enough to avoid overlaps
|
# 10k at 1/20 chance, should be enough to avoid overlaps
|
||||||
tick = cls.setupLastTick()
|
tick = cls.setupLastTick()
|
||||||
cls._lastTick = tick
|
cls._lastTick = tick
|
||||||
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
|
cls._lastLaggingTick = tick + datetime.timedelta(0, -60*2) # lagging behind 2 minutes, should catch up fast
|
||||||
# if : # should be quite a bit into the future
|
# if : # should be quite a bit into the future
|
||||||
|
|
||||||
if gConfig.mix_oos_data and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
|
if Config.isSet('mix_oos_data') and Dice.throw(20) == 0: # if asked to do so, and 1 in 20 chance, return lagging tick
|
||||||
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
|
cls._lastLaggingTick += datetime.timedelta(0, 1) # pick the next sequence from the lagging tick sequence
|
||||||
return cls._lastLaggingTick
|
return cls._lastLaggingTick
|
||||||
else: # regular
|
else: # regular
|
||||||
|
@ -1302,10 +1312,10 @@ class Task():
|
||||||
]:
|
]:
|
||||||
return True # These are the ALWAYS-ACCEPTABLE ones
|
return True # These are the ALWAYS-ACCEPTABLE ones
|
||||||
# This case handled below already.
|
# This case handled below already.
|
||||||
# elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
|
# elif (errno in [ 0x0B ]) and Settings.getConfig().auto_start_service:
|
||||||
# return True # We may get "network unavilable" when restarting service
|
# return True # We may get "network unavilable" when restarting service
|
||||||
elif gConfig.ignore_errors: # something is specified on command line
|
elif Config.getConfig().ignore_errors: # something is specified on command line
|
||||||
moreErrnos = [int(v, 0) for v in gConfig.ignore_errors.split(',')]
|
moreErrnos = [int(v, 0) for v in Config.getConfig().ignore_errors.split(',')]
|
||||||
if errno in moreErrnos:
|
if errno in moreErrnos:
|
||||||
return True
|
return True
|
||||||
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
|
elif errno == 0x200 : # invalid SQL, we need to div in a bit more
|
||||||
|
@ -1341,7 +1351,7 @@ class Task():
|
||||||
self._executeInternal(te, wt) # TODO: no return value?
|
self._executeInternal(te, wt) # TODO: no return value?
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.ProgrammingError as err:
|
||||||
errno2 = Helper.convertErrno(err.errno)
|
errno2 = Helper.convertErrno(err.errno)
|
||||||
if (gConfig.continue_on_exception): # user choose to continue
|
if (Config.getConfig().continue_on_exception): # user choose to continue
|
||||||
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
|
||||||
errno2, err, wt.getDbConn().getLastSql()))
|
errno2, err, wt.getDbConn().getLastSql()))
|
||||||
self._err = err
|
self._err = err
|
||||||
|
@ -1356,7 +1366,7 @@ class Task():
|
||||||
self.__class__.__name__,
|
self.__class__.__name__,
|
||||||
errno2, err, wt.getDbConn().getLastSql())
|
errno2, err, wt.getDbConn().getLastSql())
|
||||||
self.logDebug(errMsg)
|
self.logDebug(errMsg)
|
||||||
if gConfig.debug:
|
if Config.getConfig().debug:
|
||||||
# raise # so that we see full stack
|
# raise # so that we see full stack
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
print(
|
print(
|
||||||
|
@ -1370,13 +1380,13 @@ class Task():
|
||||||
self._err = e
|
self._err = e
|
||||||
self._aborted = True
|
self._aborted = True
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
except BaseException as e:
|
except BaseException as e2:
|
||||||
self.logInfo("Python base exception encountered")
|
self.logInfo("Python base exception encountered")
|
||||||
self._err = e
|
# self._err = e2 # Exception/BaseException incompatible!
|
||||||
self._aborted = True
|
self._aborted = True
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
except BaseException: # TODO: what is this again??!!
|
# except BaseException: # TODO: what is this again??!!
|
||||||
raise RuntimeError("Punt")
|
# raise RuntimeError("Punt")
|
||||||
# self.logDebug(
|
# self.logDebug(
|
||||||
# "[=] Unexpected exception, SQL: {}".format(
|
# "[=] Unexpected exception, SQL: {}".format(
|
||||||
# wt.getDbConn().getLastSql()))
|
# wt.getDbConn().getLastSql()))
|
||||||
|
@ -1421,11 +1431,11 @@ class Task():
|
||||||
class ExecutionStats:
|
class ExecutionStats:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
# total/success times for a task
|
# total/success times for a task
|
||||||
self._execTimes: Dict[str, [int, int]] = {}
|
self._execTimes: Dict[str, List[int]] = {}
|
||||||
self._tasksInProgress = 0
|
self._tasksInProgress = 0
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
self._firstTaskStartTime = None
|
self._firstTaskStartTime = 0.0
|
||||||
self._execStartTime = None
|
self._execStartTime = 0.0
|
||||||
self._errors = {}
|
self._errors = {}
|
||||||
self._elapsedTime = 0.0 # total elapsed time
|
self._elapsedTime = 0.0 # total elapsed time
|
||||||
self._accRunTime = 0.0 # accumulated run time
|
self._accRunTime = 0.0 # accumulated run time
|
||||||
|
@ -1470,7 +1480,7 @@ class ExecutionStats:
|
||||||
self._tasksInProgress -= 1
|
self._tasksInProgress -= 1
|
||||||
if self._tasksInProgress == 0: # all tasks have stopped
|
if self._tasksInProgress == 0: # all tasks have stopped
|
||||||
self._accRunTime += (time.time() - self._firstTaskStartTime)
|
self._accRunTime += (time.time() - self._firstTaskStartTime)
|
||||||
self._firstTaskStartTime = None
|
self._firstTaskStartTime = 0.0
|
||||||
|
|
||||||
def registerFailure(self, reason):
|
def registerFailure(self, reason):
|
||||||
self._failed = True
|
self._failed = True
|
||||||
|
@ -1554,7 +1564,7 @@ class StateTransitionTask(Task):
|
||||||
def getRegTableName(cls, i):
|
def getRegTableName(cls, i):
|
||||||
if ( StateTransitionTask._baseTableNumber is None): # Set it one time
|
if ( StateTransitionTask._baseTableNumber is None): # Set it one time
|
||||||
StateTransitionTask._baseTableNumber = Dice.throw(
|
StateTransitionTask._baseTableNumber = Dice.throw(
|
||||||
999) if gConfig.dynamic_db_table_names else 0
|
999) if Config.getConfig().dynamic_db_table_names else 0
|
||||||
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
|
return "reg_table_{}".format(StateTransitionTask._baseTableNumber + i)
|
||||||
|
|
||||||
def execute(self, wt: WorkerThread):
|
def execute(self, wt: WorkerThread):
|
||||||
|
@ -1574,14 +1584,14 @@ class TaskCreateDb(StateTransitionTask):
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
# was: self.execWtSql(wt, "create database db")
|
# was: self.execWtSql(wt, "create database db")
|
||||||
repStr = ""
|
repStr = ""
|
||||||
if gConfig.num_replicas != 1:
|
if Config.getConfig().num_replicas != 1:
|
||||||
# numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
|
# numReplica = Dice.throw(Settings.getConfig().max_replicas) + 1 # 1,2 ... N
|
||||||
numReplica = gConfig.num_replicas # fixed, always
|
numReplica = Config.getConfig().num_replicas # fixed, always
|
||||||
repStr = "replica {}".format(numReplica)
|
repStr = "replica {}".format(numReplica)
|
||||||
updatePostfix = "update 1" if gConfig.verify_data else "" # allow update only when "verify data" is active
|
updatePostfix = "update 1" if Config.getConfig().verify_data else "" # allow update only when "verify data" is active
|
||||||
dbName = self._db.getName()
|
dbName = self._db.getName()
|
||||||
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
|
self.execWtSql(wt, "create database {} {} {} ".format(dbName, repStr, updatePostfix ) )
|
||||||
if dbName == "db_0" and gConfig.use_shadow_db:
|
if dbName == "db_0" and Config.getConfig().use_shadow_db:
|
||||||
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
|
self.execWtSql(wt, "create database {} {} {} ".format("db_s", repStr, updatePostfix ) )
|
||||||
|
|
||||||
class TaskDropDb(StateTransitionTask):
|
class TaskDropDb(StateTransitionTask):
|
||||||
|
@ -1614,10 +1624,11 @@ class TaskCreateSuperTable(StateTransitionTask):
|
||||||
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
sTable = self._db.getFixedSuperTable() # type: TdSuperTable
|
||||||
# wt.execSql("use db") # should always be in place
|
# wt.execSql("use db") # should always be in place
|
||||||
|
|
||||||
sTable.create(wt.getDbConn(),
|
sTable.create(wt.getDbConn(),
|
||||||
{'ts':'TIMESTAMP', 'speed':'INT', 'color':'BINARY(16)'}, {'b':'BINARY(200)', 'f':'FLOAT'},
|
{'ts': TdDataType.TIMESTAMP, 'speed': TdDataType.INT, 'color': TdDataType.BINARY16}, {
|
||||||
dropIfExists = True
|
'b': TdDataType.BINARY200, 'f': TdDataType.FLOAT},
|
||||||
)
|
dropIfExists=True
|
||||||
|
)
|
||||||
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
# self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
||||||
# No need to create the regular tables, INSERT will do that
|
# No need to create the regular tables, INSERT will do that
|
||||||
# automatically
|
# automatically
|
||||||
|
@ -1645,9 +1656,7 @@ class TdSuperTable:
|
||||||
return dbc.existsSuperTable(self._stName)
|
return dbc.existsSuperTable(self._stName)
|
||||||
|
|
||||||
# TODO: odd semantic, create() method is usually static?
|
# TODO: odd semantic, create() method is usually static?
|
||||||
def create(self, dbc, cols: dict, tags: dict,
|
def create(self, dbc, cols: TdColumns, tags: TdTags, dropIfExists = False):
|
||||||
dropIfExists = False
|
|
||||||
):
|
|
||||||
'''Creating a super table'''
|
'''Creating a super table'''
|
||||||
|
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
|
@ -1658,17 +1667,17 @@ class TdSuperTable:
|
||||||
dbc.execute("DROP TABLE {}".format(fullTableName))
|
dbc.execute("DROP TABLE {}".format(fullTableName))
|
||||||
else: # error
|
else: # error
|
||||||
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
raise CrashGenError("Cannot create super table, already exists: {}".format(self._stName))
|
||||||
|
|
||||||
# Now let's create
|
# Now let's create
|
||||||
sql = "CREATE TABLE {} ({})".format(
|
sql = "CREATE TABLE {} ({})".format(
|
||||||
fullTableName,
|
fullTableName,
|
||||||
",".join(['%s %s'%(k,v) for (k,v) in cols.items()]))
|
",".join(['%s %s'%(k,v.value) for (k,v) in cols.items()]))
|
||||||
if tags is None :
|
if tags :
|
||||||
sql += " TAGS (dummy int) "
|
|
||||||
else:
|
|
||||||
sql += " TAGS ({})".format(
|
sql += " TAGS ({})".format(
|
||||||
",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
|
",".join(['%s %s'%(k,v.value) for (k,v) in tags.items()])
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
sql += " TAGS (dummy int) "
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
def getRegTables(self, dbc: DbConn):
|
def getRegTables(self, dbc: DbConn):
|
||||||
|
@ -1686,7 +1695,7 @@ class TdSuperTable:
|
||||||
def hasRegTables(self, dbc: DbConn):
|
def hasRegTables(self, dbc: DbConn):
|
||||||
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
|
return dbc.query("SELECT * FROM {}.{}".format(self._dbName, self._stName)) > 0
|
||||||
|
|
||||||
def ensureTable(self, task: Task, dbc: DbConn, regTableName: str):
|
def ensureRegTable(self, task: Optional[Task], dbc: DbConn, regTableName: str):
|
||||||
dbName = self._dbName
|
dbName = self._dbName
|
||||||
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
|
sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
|
||||||
if dbc.query(sql) >= 1 : # reg table exists already
|
if dbc.query(sql) >= 1 : # reg table exists already
|
||||||
|
@ -1694,7 +1703,7 @@ class TdSuperTable:
|
||||||
|
|
||||||
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
|
# acquire a lock first, so as to be able to *verify*. More details in TD-1471
|
||||||
fullTableName = dbName + '.' + regTableName
|
fullTableName = dbName + '.' + regTableName
|
||||||
if task is not None: # optional lock
|
if task is not None: # TODO: what happens if we don't lock the table
|
||||||
task.lockTable(fullTableName)
|
task.lockTable(fullTableName)
|
||||||
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
|
Progress.emit(Progress.CREATE_TABLE_ATTEMPT) # ATTEMPT to create a new table
|
||||||
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
|
# print("(" + fullTableName[-3:] + ")", end="", flush=True)
|
||||||
|
@ -1886,7 +1895,7 @@ class TaskDropSuperTable(StateTransitionTask):
|
||||||
if Dice.throw(2) == 0:
|
if Dice.throw(2) == 0:
|
||||||
# print("_7_", end="", flush=True)
|
# print("_7_", end="", flush=True)
|
||||||
tblSeq = list(range(
|
tblSeq = list(range(
|
||||||
2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
|
2 + (self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES)))
|
||||||
random.shuffle(tblSeq)
|
random.shuffle(tblSeq)
|
||||||
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
|
tickOutput = False # if we have spitted out a "d" character for "drop regular table"
|
||||||
isSuccess = True
|
isSuccess = True
|
||||||
|
@ -1952,13 +1961,13 @@ class TaskRestartService(StateTransitionTask):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def canBeginFrom(cls, state: AnyState):
|
def canBeginFrom(cls, state: AnyState):
|
||||||
if gConfig.auto_start_service:
|
if Config.getConfig().auto_start_service:
|
||||||
return state.canDropFixedSuperTable() # Basicallly when we have the super table
|
return state.canDropFixedSuperTable() # Basicallly when we have the super table
|
||||||
return False # don't run this otherwise
|
return False # don't run this otherwise
|
||||||
|
|
||||||
CHANCE_TO_RESTART_SERVICE = 200
|
CHANCE_TO_RESTART_SERVICE = 200
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
if not gConfig.auto_start_service: # only execute when we are in -a mode
|
if not Config.getConfig().auto_start_service: # only execute when we are in -a mode
|
||||||
print("_a", end="", flush=True)
|
print("_a", end="", flush=True)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -1980,12 +1989,12 @@ class TaskAddData(StateTransitionTask):
|
||||||
activeTable: Set[int] = set()
|
activeTable: Set[int] = set()
|
||||||
|
|
||||||
# We use these two files to record operations to DB, useful for power-off tests
|
# We use these two files to record operations to DB, useful for power-off tests
|
||||||
fAddLogReady = None # type: TextIOWrapper
|
fAddLogReady = None # type: Optional[io.TextIOWrapper]
|
||||||
fAddLogDone = None # type: TextIOWrapper
|
fAddLogDone = None # type: Optional[io.TextIOWrapper]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def prepToRecordOps(cls):
|
def prepToRecordOps(cls):
|
||||||
if gConfig.record_ops:
|
if Config.getConfig().record_ops:
|
||||||
if (cls.fAddLogReady is None):
|
if (cls.fAddLogReady is None):
|
||||||
Logging.info(
|
Logging.info(
|
||||||
"Recording in a file operations to be performed...")
|
"Recording in a file operations to be performed...")
|
||||||
|
@ -2003,7 +2012,7 @@ class TaskAddData(StateTransitionTask):
|
||||||
return state.canAddData()
|
return state.canAddData()
|
||||||
|
|
||||||
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
|
def _addDataInBatch(self, db, dbc, regTableName, te: TaskExecutor):
|
||||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||||
fullTableName = db.getName() + '.' + regTableName
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
|
|
||||||
sql = "INSERT INTO {} VALUES ".format(fullTableName)
|
sql = "INSERT INTO {} VALUES ".format(fullTableName)
|
||||||
|
@ -2015,21 +2024,23 @@ class TaskAddData(StateTransitionTask):
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
|
def _addData(self, db: Database, dbc, regTableName, te: TaskExecutor): # implied: NOT in batches
|
||||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||||
|
|
||||||
for j in range(numRecords): # number of records per table
|
for j in range(numRecords): # number of records per table
|
||||||
nextInt = db.getNextInt()
|
nextInt = db.getNextInt()
|
||||||
nextTick = db.getNextTick()
|
nextTick = db.getNextTick()
|
||||||
nextColor = db.getNextColor()
|
nextColor = db.getNextColor()
|
||||||
if gConfig.record_ops:
|
if Config.getConfig().record_ops:
|
||||||
self.prepToRecordOps()
|
self.prepToRecordOps()
|
||||||
|
if self.fAddLogReady is None:
|
||||||
|
raise CrashGenError("Unexpected empty fAddLogReady")
|
||||||
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
|
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
|
||||||
self.fAddLogReady.flush()
|
self.fAddLogReady.flush()
|
||||||
os.fsync(self.fAddLogReady)
|
os.fsync(self.fAddLogReady.fileno())
|
||||||
|
|
||||||
# TODO: too ugly trying to lock the table reliably, refactor...
|
# TODO: too ugly trying to lock the table reliably, refactor...
|
||||||
fullTableName = db.getName() + '.' + regTableName
|
fullTableName = db.getName() + '.' + regTableName
|
||||||
if gConfig.verify_data:
|
if Config.getConfig().verify_data:
|
||||||
self.lockTable(fullTableName)
|
self.lockTable(fullTableName)
|
||||||
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
|
# print("_w" + str(nextInt % 100), end="", flush=True) # Trace what was written
|
||||||
|
|
||||||
|
@ -2042,7 +2053,7 @@ class TaskAddData(StateTransitionTask):
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
# Quick hack, attach an update statement here. TODO: create an "update" task
|
# Quick hack, attach an update statement here. TODO: create an "update" task
|
||||||
if (not gConfig.use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
if (not Config.getConfig().use_shadow_db) and Dice.throw(5) == 0: # 1 in N chance, plus not using shaddow DB
|
||||||
nextInt = db.getNextInt()
|
nextInt = db.getNextInt()
|
||||||
nextColor = db.getNextColor()
|
nextColor = db.getNextColor()
|
||||||
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
|
sql = "INSERt INTO {} VALUES ('{}', {}, '{}');".format( # "INSERt" means "update" here
|
||||||
|
@ -2053,12 +2064,12 @@ class TaskAddData(StateTransitionTask):
|
||||||
dbc.execute(sql)
|
dbc.execute(sql)
|
||||||
|
|
||||||
except: # Any exception at all
|
except: # Any exception at all
|
||||||
if gConfig.verify_data:
|
if Config.getConfig().verify_data:
|
||||||
self.unlockTable(fullTableName)
|
self.unlockTable(fullTableName)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Now read it back and verify, we might encounter an error if table is dropped
|
# Now read it back and verify, we might encounter an error if table is dropped
|
||||||
if gConfig.verify_data: # only if command line asks for it
|
if Config.getConfig().verify_data: # only if command line asks for it
|
||||||
try:
|
try:
|
||||||
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
|
readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts='{}'".
|
||||||
format(db.getName(), regTableName, nextTick))
|
format(db.getName(), regTableName, nextTick))
|
||||||
|
@ -2085,17 +2096,19 @@ class TaskAddData(StateTransitionTask):
|
||||||
# Successfully wrote the data into the DB, let's record it somehow
|
# Successfully wrote the data into the DB, let's record it somehow
|
||||||
te.recordDataMark(nextInt)
|
te.recordDataMark(nextInt)
|
||||||
|
|
||||||
if gConfig.record_ops:
|
if Config.getConfig().record_ops:
|
||||||
|
if self.fAddLogDone is None:
|
||||||
|
raise CrashGenError("Unexpected empty fAddLogDone")
|
||||||
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
|
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
|
||||||
self.fAddLogDone.flush()
|
self.fAddLogDone.flush()
|
||||||
os.fsync(self.fAddLogDone)
|
os.fsync(self.fAddLogDone.fileno())
|
||||||
|
|
||||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||||
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
|
# ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
|
||||||
db = self._db
|
db = self._db
|
||||||
dbc = wt.getDbConn()
|
dbc = wt.getDbConn()
|
||||||
numTables = self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES
|
numTables = self.LARGE_NUMBER_OF_TABLES if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_TABLES
|
||||||
numRecords = self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS
|
numRecords = self.LARGE_NUMBER_OF_RECORDS if Config.getConfig().larger_data else self.SMALL_NUMBER_OF_RECORDS
|
||||||
tblSeq = list(range(numTables ))
|
tblSeq = list(range(numTables ))
|
||||||
random.shuffle(tblSeq) # now we have random sequence
|
random.shuffle(tblSeq) # now we have random sequence
|
||||||
for i in tblSeq:
|
for i in tblSeq:
|
||||||
|
@ -2110,7 +2123,7 @@ class TaskAddData(StateTransitionTask):
|
||||||
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
regTableName = self.getRegTableName(i) # "db.reg_table_{}".format(i)
|
||||||
fullTableName = dbName + '.' + regTableName
|
fullTableName = dbName + '.' + regTableName
|
||||||
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
|
# self._lockTable(fullTableName) # "create table" below. Stop it if the table is "locked"
|
||||||
sTable.ensureTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
|
sTable.ensureRegTable(self, wt.getDbConn(), regTableName) # Ensure the table exists
|
||||||
# self._unlockTable(fullTableName)
|
# self._unlockTable(fullTableName)
|
||||||
|
|
||||||
if Dice.throw(1) == 0: # 1 in 2 chance
|
if Dice.throw(1) == 0: # 1 in 2 chance
|
||||||
|
@ -2125,7 +2138,9 @@ class ThreadStacks: # stack info for all threads
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._allStacks = {}
|
self._allStacks = {}
|
||||||
allFrames = sys._current_frames()
|
allFrames = sys._current_frames()
|
||||||
for th in threading.enumerate():
|
for th in threading.enumerate():
|
||||||
|
if th.ident is None:
|
||||||
|
continue
|
||||||
stack = traceback.extract_stack(allFrames[th.ident])
|
stack = traceback.extract_stack(allFrames[th.ident])
|
||||||
self._allStacks[th.native_id] = stack
|
self._allStacks[th.native_id] = stack
|
||||||
|
|
||||||
|
@ -2246,14 +2261,15 @@ class ClientManager:
|
||||||
|
|
||||||
def run(self, svcMgr):
|
def run(self, svcMgr):
|
||||||
# self._printLastNumbers()
|
# self._printLastNumbers()
|
||||||
global gConfig
|
# global gConfig
|
||||||
|
|
||||||
# Prepare Tde Instance
|
# Prepare Tde Instance
|
||||||
global gContainer
|
global gContainer
|
||||||
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
|
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
|
||||||
|
|
||||||
dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function
|
cfg = Config.getConfig()
|
||||||
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
|
dbManager = DbManager(cfg.connector_type, tInst.getDbTarget()) # Regular function
|
||||||
|
thPool = ThreadPool(cfg.num_threads, cfg.max_steps)
|
||||||
self.tc = ThreadCoordinator(thPool, dbManager)
|
self.tc = ThreadCoordinator(thPool, dbManager)
|
||||||
|
|
||||||
Logging.info("Starting client instance: {}".format(tInst))
|
Logging.info("Starting client instance: {}".format(tInst))
|
||||||
|
@ -2266,7 +2282,8 @@ class ClientManager:
|
||||||
|
|
||||||
|
|
||||||
# Release global variables
|
# Release global variables
|
||||||
gConfig = None
|
# gConfig = None
|
||||||
|
Config.clearConfig()
|
||||||
gSvcMgr = None
|
gSvcMgr = None
|
||||||
logger = None
|
logger = None
|
||||||
|
|
||||||
|
@ -2297,7 +2314,7 @@ class ClientManager:
|
||||||
class MainExec:
|
class MainExec:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._clientMgr = None
|
self._clientMgr = None
|
||||||
self._svcMgr = None # type: ServiceManager
|
self._svcMgr = None # type: Optional[ServiceManager]
|
||||||
|
|
||||||
signal.signal(signal.SIGTERM, self.sigIntHandler)
|
signal.signal(signal.SIGTERM, self.sigIntHandler)
|
||||||
signal.signal(signal.SIGINT, self.sigIntHandler)
|
signal.signal(signal.SIGINT, self.sigIntHandler)
|
||||||
|
@ -2317,7 +2334,7 @@ class MainExec:
|
||||||
|
|
||||||
def runClient(self):
|
def runClient(self):
|
||||||
global gSvcMgr
|
global gSvcMgr
|
||||||
if gConfig.auto_start_service:
|
if Config.getConfig().auto_start_service:
|
||||||
gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
|
gSvcMgr = self._svcMgr = ServiceManager(1) # hack alert
|
||||||
gSvcMgr.startTaosServices() # we start, don't run
|
gSvcMgr.startTaosServices() # we start, don't run
|
||||||
|
|
||||||
|
@ -2326,26 +2343,18 @@ class MainExec:
|
||||||
try:
|
try:
|
||||||
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
|
ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
|
||||||
except requests.exceptions.ConnectionError as err:
|
except requests.exceptions.ConnectionError as err:
|
||||||
Logging.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
|
Logging.warning("Failed to open REST connection to DB: {}".format(err))
|
||||||
# don't raise
|
# don't raise
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
def runService(self):
|
def runService(self):
|
||||||
global gSvcMgr
|
global gSvcMgr
|
||||||
gSvcMgr = self._svcMgr = ServiceManager(gConfig.num_dnodes) # save it in a global variable TODO: hack alert
|
gSvcMgr = self._svcMgr = ServiceManager(Config.getConfig().num_dnodes) # save it in a global variable TODO: hack alert
|
||||||
|
|
||||||
gSvcMgr.run() # run to some end state
|
gSvcMgr.run() # run to some end state
|
||||||
gSvcMgr = self._svcMgr = None
|
gSvcMgr = self._svcMgr = None
|
||||||
|
|
||||||
def init(self): # TODO: refactor
|
def _buildCmdLineParser(self):
|
||||||
global gContainer
|
|
||||||
gContainer = Container() # micky-mouse DI
|
|
||||||
|
|
||||||
global gSvcMgr # TODO: refactor away
|
|
||||||
gSvcMgr = None
|
|
||||||
|
|
||||||
# Super cool Python argument library:
|
|
||||||
# https://docs.python.org/3/library/argparse.html
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
description=textwrap.dedent('''\
|
description=textwrap.dedent('''\
|
||||||
|
@ -2466,20 +2475,29 @@ class MainExec:
|
||||||
action='store_true',
|
action='store_true',
|
||||||
help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
|
help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
|
||||||
|
|
||||||
global gConfig
|
return parser
|
||||||
gConfig = parser.parse_args()
|
|
||||||
crash_gen.settings.gConfig = gConfig # TODO: fix this hack, consolidate this global var
|
|
||||||
|
def init(self): # TODO: refactor
|
||||||
|
global gContainer
|
||||||
|
gContainer = Container() # micky-mouse DI
|
||||||
|
|
||||||
|
global gSvcMgr # TODO: refactor away
|
||||||
|
gSvcMgr = None
|
||||||
|
|
||||||
|
parser = self._buildCmdLineParser()
|
||||||
|
Config.init(parser)
|
||||||
|
|
||||||
# Sanity check for arguments
|
# Sanity check for arguments
|
||||||
if gConfig.use_shadow_db and gConfig.max_dbs>1 :
|
if Config.getConfig().use_shadow_db and Config.getConfig().max_dbs>1 :
|
||||||
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
|
raise CrashGenError("Cannot combine use-shadow-db with max-dbs of more than 1")
|
||||||
|
|
||||||
Logging.clsInit(gConfig)
|
Logging.clsInit(Config.getConfig().debug)
|
||||||
|
|
||||||
Dice.seed(0) # initial seeding of dice
|
Dice.seed(0) # initial seeding of dice
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
if gConfig.run_tdengine: # run server
|
if Config.getConfig().run_tdengine: # run server
|
||||||
try:
|
try:
|
||||||
self.runService()
|
self.runService()
|
||||||
return 0 # success
|
return 0 # success
|
||||||
|
|
|
@ -1,25 +1,33 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import io
|
import io
|
||||||
import sys
|
import sys
|
||||||
|
from enum import Enum
|
||||||
import threading
|
import threading
|
||||||
import signal
|
import signal
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import subprocess
|
from subprocess import PIPE, Popen, TimeoutExpired
|
||||||
|
from typing import BinaryIO, Generator, IO, List, NewType, Optional
|
||||||
from typing import IO, List
|
import typing
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import psutil
|
import psutil
|
||||||
except:
|
except:
|
||||||
print("Psutil module needed, please install: sudo pip3 install psutil")
|
print("Psutil module needed, please install: sudo pip3 install psutil")
|
||||||
sys.exit(-1)
|
sys.exit(-1)
|
||||||
|
|
||||||
from queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
|
|
||||||
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
|
from .shared.config import Config
|
||||||
from .db import DbConn, DbTarget
|
from .shared.db import DbTarget, DbConn
|
||||||
import crash_gen.settings
|
from .shared.misc import Logging, Helper, CrashGenError, Status, Progress, Dice
|
||||||
|
from .shared.types import DirPath
|
||||||
|
|
||||||
|
# from crash_gen.misc import CrashGenError, Dice, Helper, Logging, Progress, Status
|
||||||
|
# from crash_gen.db import DbConn, DbTarget
|
||||||
|
# from crash_gen.settings import Config
|
||||||
|
# from crash_gen.types import DirPath
|
||||||
|
|
||||||
class TdeInstance():
|
class TdeInstance():
|
||||||
"""
|
"""
|
||||||
|
@ -68,7 +76,10 @@ class TdeInstance():
|
||||||
self._fepPort = fepPort
|
self._fepPort = fepPort
|
||||||
|
|
||||||
self._tInstNum = tInstNum
|
self._tInstNum = tInstNum
|
||||||
self._smThread = ServiceManagerThread()
|
|
||||||
|
# An "Tde Instance" will *contain* a "sub process" object, with will/may use a thread internally
|
||||||
|
# self._smThread = ServiceManagerThread()
|
||||||
|
self._subProcess = None # type: Optional[TdeSubProcess]
|
||||||
|
|
||||||
def getDbTarget(self):
|
def getDbTarget(self):
|
||||||
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
|
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
|
||||||
|
@ -153,23 +164,24 @@ quorum 2
|
||||||
def getExecFile(self): # .../taosd
|
def getExecFile(self): # .../taosd
|
||||||
return self._buildDir + "/build/bin/taosd"
|
return self._buildDir + "/build/bin/taosd"
|
||||||
|
|
||||||
def getRunDir(self): # TODO: rename to "root dir" ?!
|
def getRunDir(self) -> DirPath : # TODO: rename to "root dir" ?!
|
||||||
return self._buildDir + self._subdir
|
return DirPath(self._buildDir + self._subdir)
|
||||||
|
|
||||||
def getCfgDir(self): # path, not file
|
def getCfgDir(self) -> DirPath : # path, not file
|
||||||
return self.getRunDir() + "/cfg"
|
return DirPath(self.getRunDir() + "/cfg")
|
||||||
|
|
||||||
def getLogDir(self):
|
def getLogDir(self) -> DirPath :
|
||||||
return self.getRunDir() + "/log"
|
return DirPath(self.getRunDir() + "/log")
|
||||||
|
|
||||||
def getHostAddr(self):
|
def getHostAddr(self):
|
||||||
return "127.0.0.1"
|
return "127.0.0.1"
|
||||||
|
|
||||||
def getServiceCmdLine(self): # to start the instance
|
def getServiceCmdLine(self): # to start the instance
|
||||||
cmdLine = []
|
cmdLine = []
|
||||||
if crash_gen.settings.gConfig.track_memory_leaks:
|
if Config.getConfig().track_memory_leaks:
|
||||||
Logging.info("Invoking VALGRIND on service...")
|
Logging.info("Invoking VALGRIND on service...")
|
||||||
cmdLine = ['valgrind', '--leak-check=yes']
|
cmdLine = ['valgrind', '--leak-check=yes']
|
||||||
|
# TODO: move "exec -c" into Popen(), we can both "use shell" and NOT fork so ask to lose kill control
|
||||||
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
cmdLine += ["exec " + self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()
|
||||||
return cmdLine
|
return cmdLine
|
||||||
|
|
||||||
|
@ -196,27 +208,46 @@ quorum 2
|
||||||
dbc.close()
|
dbc.close()
|
||||||
|
|
||||||
def getStatus(self):
|
def getStatus(self):
|
||||||
return self._smThread.getStatus()
|
# return self._smThread.getStatus()
|
||||||
|
if self._subProcess is None:
|
||||||
|
return Status(Status.STATUS_EMPTY)
|
||||||
|
return self._subProcess.getStatus()
|
||||||
|
|
||||||
def getSmThread(self):
|
# def getSmThread(self):
|
||||||
return self._smThread
|
# return self._smThread
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
if not self.getStatus().isStopped():
|
if self.getStatus().isActive():
|
||||||
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
|
raise CrashGenError("Cannot start instance from status: {}".format(self.getStatus()))
|
||||||
|
|
||||||
Logging.info("Starting TDengine instance: {}".format(self))
|
Logging.info("Starting TDengine instance: {}".format(self))
|
||||||
self.generateCfgFile() # service side generates config file, client does not
|
self.generateCfgFile() # service side generates config file, client does not
|
||||||
self.rotateLogs()
|
self.rotateLogs()
|
||||||
|
|
||||||
self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
|
# self._smThread.start(self.getServiceCmdLine(), self.getLogDir()) # May raise exceptions
|
||||||
|
self._subProcess = TdeSubProcess(self.getServiceCmdLine(), self.getLogDir())
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._smThread.stop()
|
self._subProcess.stop()
|
||||||
|
self._subProcess = None
|
||||||
|
|
||||||
def isFirst(self):
|
def isFirst(self):
|
||||||
return self._tInstNum == 0
|
return self._tInstNum == 0
|
||||||
|
|
||||||
|
def printFirst10Lines(self):
|
||||||
|
if self._subProcess is None:
|
||||||
|
Logging.warning("Incorrect TI status for procIpcBatch-10 operation")
|
||||||
|
return
|
||||||
|
self._subProcess.procIpcBatch(trimToTarget=10, forceOutput=True)
|
||||||
|
|
||||||
|
def procIpcBatch(self):
|
||||||
|
if self._subProcess is None:
|
||||||
|
Logging.warning("Incorrect TI status for procIpcBatch operation")
|
||||||
|
return
|
||||||
|
self._subProcess.procIpcBatch() # may enounter EOF and change status to STOPPED
|
||||||
|
if self._subProcess.getStatus().isStopped():
|
||||||
|
self._subProcess.stop()
|
||||||
|
self._subProcess = None
|
||||||
|
|
||||||
class TdeSubProcess:
|
class TdeSubProcess:
|
||||||
"""
|
"""
|
||||||
|
@ -225,42 +256,57 @@ class TdeSubProcess:
|
||||||
|
|
||||||
It takes a TdeInstance object as its parameter, with the rationale being
|
It takes a TdeInstance object as its parameter, with the rationale being
|
||||||
"a sub process runs an instance".
|
"a sub process runs an instance".
|
||||||
|
|
||||||
|
We aim to ensure that this object has exactly the same life-cycle as the
|
||||||
|
underlying sub process.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# RET_ALREADY_STOPPED = -1
|
# RET_ALREADY_STOPPED = -1
|
||||||
# RET_TIME_OUT = -3
|
# RET_TIME_OUT = -3
|
||||||
# RET_SUCCESS = -4
|
# RET_SUCCESS = -4
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, cmdLine: List[str], logDir: DirPath):
|
||||||
self.subProcess = None # type: subprocess.Popen
|
# Create the process + managing thread immediately
|
||||||
# if tInst is None:
|
|
||||||
# raise CrashGenError("Empty instance not allowed in TdeSubProcess")
|
Logging.info("Attempting to start TAOS sub process...")
|
||||||
# self._tInst = tInst # Default create at ServiceManagerThread
|
self._popen = self._start(cmdLine) # the actual sub process
|
||||||
|
self._smThread = ServiceManagerThread(self, logDir) # A thread to manage the sub process, mostly to process the IO
|
||||||
|
Logging.info("Successfully started TAOS process: {}".format(self))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
if self.subProcess is None:
|
# if self.subProcess is None:
|
||||||
return '[TdeSubProc: Empty]'
|
# return '[TdeSubProc: Empty]'
|
||||||
return '[TdeSubProc: pid = {}]'.format(self.getPid())
|
return '[TdeSubProc: pid = {}, status = {}]'.format(
|
||||||
|
self.getPid(), self.getStatus() )
|
||||||
|
|
||||||
def getStdOut(self):
|
def getStdOut(self) -> BinaryIO :
|
||||||
return self.subProcess.stdout
|
if self._popen.universal_newlines : # alias of text_mode
|
||||||
|
raise CrashGenError("We need binary mode for STDOUT IPC")
|
||||||
|
# Logging.info("Type of stdout is: {}".format(type(self._popen.stdout)))
|
||||||
|
return typing.cast(BinaryIO, self._popen.stdout)
|
||||||
|
|
||||||
def getStdErr(self):
|
def getStdErr(self) -> BinaryIO :
|
||||||
return self.subProcess.stderr
|
if self._popen.universal_newlines : # alias of text_mode
|
||||||
|
raise CrashGenError("We need binary mode for STDERR IPC")
|
||||||
|
return typing.cast(BinaryIO, self._popen.stderr)
|
||||||
|
|
||||||
def isRunning(self):
|
# Now it's always running, since we matched the life cycle
|
||||||
return self.subProcess is not None
|
# def isRunning(self):
|
||||||
|
# return self.subProcess is not None
|
||||||
|
|
||||||
def getPid(self):
|
def getPid(self):
|
||||||
return self.subProcess.pid
|
return self._popen.pid
|
||||||
|
|
||||||
def start(self, cmdLine):
|
def _start(self, cmdLine) -> Popen :
|
||||||
ON_POSIX = 'posix' in sys.builtin_module_names
|
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||||
|
|
||||||
# Sanity check
|
# Sanity check
|
||||||
if self.subProcess: # already there
|
# if self.subProcess: # already there
|
||||||
raise RuntimeError("Corrupt process state")
|
# raise RuntimeError("Corrupt process state")
|
||||||
|
|
||||||
|
|
||||||
# Prepare environment variables for coverage information
|
# Prepare environment variables for coverage information
|
||||||
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
|
# Ref: https://stackoverflow.com/questions/2231227/python-subprocess-popen-with-a-modified-environment
|
||||||
myEnv = os.environ.copy()
|
myEnv = os.environ.copy()
|
||||||
|
@ -270,15 +316,12 @@ class TdeSubProcess:
|
||||||
# print("Starting TDengine with env: ", myEnv.items())
|
# print("Starting TDengine with env: ", myEnv.items())
|
||||||
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
# print("Starting TDengine via Shell: {}".format(cmdLineStr))
|
||||||
|
|
||||||
useShell = True # Needed to pass environments into it
|
# useShell = True # Needed to pass environments into it
|
||||||
self.subProcess = subprocess.Popen(
|
return Popen(
|
||||||
# ' '.join(cmdLine) if useShell else cmdLine,
|
' '.join(cmdLine), # ' '.join(cmdLine) if useShell else cmdLine,
|
||||||
# shell=useShell,
|
shell=True, # Always use shell, since we need to pass ENV vars
|
||||||
' '.join(cmdLine),
|
stdout=PIPE,
|
||||||
shell=True,
|
stderr=PIPE,
|
||||||
stdout=subprocess.PIPE,
|
|
||||||
stderr=subprocess.PIPE,
|
|
||||||
# bufsize=1, # not supported in binary mode
|
|
||||||
close_fds=ON_POSIX,
|
close_fds=ON_POSIX,
|
||||||
env=myEnv
|
env=myEnv
|
||||||
) # had text=True, which interferred with reading EOF
|
) # had text=True, which interferred with reading EOF
|
||||||
|
@ -288,7 +331,9 @@ class TdeSubProcess:
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
"""
|
"""
|
||||||
Stop a sub process, DO NOT return anything, process all conditions INSIDE
|
Stop a sub process, DO NOT return anything, process all conditions INSIDE.
|
||||||
|
|
||||||
|
Calling function should immediately delete/unreference the object
|
||||||
|
|
||||||
Common POSIX signal values (from man -7 signal):
|
Common POSIX signal values (from man -7 signal):
|
||||||
SIGHUP 1
|
SIGHUP 1
|
||||||
|
@ -306,29 +351,39 @@ class TdeSubProcess:
|
||||||
SIGSEGV 11
|
SIGSEGV 11
|
||||||
SIGUSR2 12
|
SIGUSR2 12
|
||||||
"""
|
"""
|
||||||
if not self.subProcess:
|
# self._popen should always be valid.
|
||||||
Logging.error("Sub process already stopped")
|
|
||||||
|
Logging.info("Terminating TDengine service running as the sub process...")
|
||||||
|
if self.getStatus().isStopped():
|
||||||
|
Logging.info("Service already stopped")
|
||||||
|
return
|
||||||
|
if self.getStatus().isStopping():
|
||||||
|
Logging.info("Service is already being stopped, pid: {}".format(self.getPid()))
|
||||||
return
|
return
|
||||||
|
|
||||||
retCode = self.subProcess.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
self.setStatus(Status.STATUS_STOPPING)
|
||||||
|
|
||||||
|
retCode = self._popen.poll() # ret -N means killed with signal N, otherwise it's from exit(N)
|
||||||
if retCode: # valid return code, process ended
|
if retCode: # valid return code, process ended
|
||||||
# retCode = -retCode # only if valid
|
# retCode = -retCode # only if valid
|
||||||
Logging.warning("TSP.stop(): process ended itself")
|
Logging.warning("TSP.stop(): process ended itself")
|
||||||
self.subProcess = None
|
# self.subProcess = None
|
||||||
return
|
return
|
||||||
|
|
||||||
# process still alive, let's interrupt it
|
# process still alive, let's interrupt it
|
||||||
self._stopForSure(self.subProcess, self.STOP_SIGNAL) # success if no exception
|
self._stopForSure(self._popen, self.STOP_SIGNAL) # success if no exception
|
||||||
self.subProcess = None
|
|
||||||
|
|
||||||
# sub process should end, then IPC queue should end, causing IO thread to end
|
# sub process should end, then IPC queue should end, causing IO thread to end
|
||||||
|
self._smThread.stop() # stop for sure too
|
||||||
|
|
||||||
|
self.setStatus(Status.STATUS_STOPPED)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _stopForSure(cls, proc: subprocess.Popen, sig: int):
|
def _stopForSure(cls, proc: Popen, sig: int):
|
||||||
'''
|
'''
|
||||||
Stop a process and all sub processes with a singal, and SIGKILL if necessary
|
Stop a process and all sub processes with a singal, and SIGKILL if necessary
|
||||||
'''
|
'''
|
||||||
def doKillTdService(proc: subprocess.Popen, sig: int):
|
def doKillTdService(proc: Popen, sig: int):
|
||||||
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
Logging.info("Killing sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||||
proc.send_signal(sig)
|
proc.send_signal(sig)
|
||||||
try:
|
try:
|
||||||
|
@ -340,7 +395,7 @@ class TdeSubProcess:
|
||||||
else:
|
else:
|
||||||
Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
Logging.warning("TD service terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
||||||
return True # terminated successfully
|
return True # terminated successfully
|
||||||
except subprocess.TimeoutExpired as err:
|
except TimeoutExpired as err:
|
||||||
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
|
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(proc.pid, sig))
|
||||||
return False # failed to terminate
|
return False # failed to terminate
|
||||||
|
|
||||||
|
@ -349,22 +404,22 @@ class TdeSubProcess:
|
||||||
Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
|
Logging.info("Killing sub-sub process {} with signal {}".format(child.pid, sig))
|
||||||
child.send_signal(sig)
|
child.send_signal(sig)
|
||||||
try:
|
try:
|
||||||
retCode = child.wait(20)
|
retCode = child.wait(20) # type: ignore
|
||||||
if (- retCode) == signal.SIGSEGV: # Crashed
|
if (- retCode) == signal.SIGSEGV: # type: ignore # Crashed
|
||||||
Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
|
Logging.warning("Process {} CRASHED, please check CORE file!".format(child.pid))
|
||||||
elif (- retCode) == sig :
|
elif (- retCode) == sig : # type: ignore
|
||||||
Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
|
Logging.info("Sub-sub process terminated with expected return code {}".format(sig))
|
||||||
else:
|
else:
|
||||||
Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode))
|
Logging.warning("Process terminated, EXPECTING ret code {}, got {}".format(sig, -retCode)) # type: ignore
|
||||||
return True # terminated successfully
|
return True # terminated successfully
|
||||||
except psutil.TimeoutExpired as err:
|
except psutil.TimeoutExpired as err:
|
||||||
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
|
Logging.warning("Failed to kill sub-sub process {} with signal {}".format(child.pid, sig))
|
||||||
return False # did not terminate
|
return False # did not terminate
|
||||||
|
|
||||||
def doKill(proc: subprocess.Popen, sig: int):
|
def doKill(proc: Popen, sig: int):
|
||||||
pid = proc.pid
|
pid = proc.pid
|
||||||
try:
|
try:
|
||||||
topSubProc = psutil.Process(pid)
|
topSubProc = psutil.Process(pid) # Now that we are doing "exec -c", should not have children any more
|
||||||
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
for child in topSubProc.children(recursive=True): # or parent.children() for recursive=False
|
||||||
Logging.warning("Unexpected child to be killed")
|
Logging.warning("Unexpected child to be killed")
|
||||||
doKillChild(child, sig)
|
doKillChild(child, sig)
|
||||||
|
@ -389,19 +444,26 @@ class TdeSubProcess:
|
||||||
return doKill(proc, sig)
|
return doKill(proc, sig)
|
||||||
|
|
||||||
def hardKill(proc):
|
def hardKill(proc):
|
||||||
return doKill(proc, signal.SIGKILL)
|
return doKill(proc, signal.SIGKILL)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
pid = proc.pid
|
pid = proc.pid
|
||||||
Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
|
Logging.info("Terminate running processes under {}, with SIG #{} and wait...".format(pid, sig))
|
||||||
if softKill(proc, sig):
|
if softKill(proc, sig):
|
||||||
return# success
|
return # success
|
||||||
if sig != signal.SIGKILL: # really was soft above
|
if sig != signal.SIGKILL: # really was soft above
|
||||||
if hardKill(proc):
|
if hardKill(proc):
|
||||||
return
|
return
|
||||||
raise CrashGenError("Failed to stop process, pid={}".format(pid))
|
raise CrashGenError("Failed to stop process, pid={}".format(pid))
|
||||||
|
|
||||||
|
def getStatus(self):
|
||||||
|
return self._smThread.getStatus()
|
||||||
|
|
||||||
|
def setStatus(self, status):
|
||||||
|
self._smThread.setStatus(status)
|
||||||
|
|
||||||
|
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
|
||||||
|
self._smThread.procIpcBatch(trimToTarget, forceOutput)
|
||||||
|
|
||||||
class ServiceManager:
|
class ServiceManager:
|
||||||
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
|
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
|
||||||
|
|
||||||
|
@ -498,10 +560,10 @@ class ServiceManager:
|
||||||
def isActive(self):
|
def isActive(self):
|
||||||
"""
|
"""
|
||||||
Determine if the service/cluster is active at all, i.e. at least
|
Determine if the service/cluster is active at all, i.e. at least
|
||||||
one thread is not "stopped".
|
one instance is active
|
||||||
"""
|
"""
|
||||||
for ti in self._tInsts:
|
for ti in self._tInsts:
|
||||||
if not ti.getStatus().isStopped():
|
if ti.getStatus().isActive():
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -539,10 +601,10 @@ class ServiceManager:
|
||||||
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
|
# while self.isRunning() or self.isRestarting() : # for as long as the svc mgr thread is still here
|
||||||
status = ti.getStatus()
|
status = ti.getStatus()
|
||||||
if status.isRunning():
|
if status.isRunning():
|
||||||
th = ti.getSmThread()
|
# th = ti.getSmThread()
|
||||||
th.procIpcBatch() # regular processing,
|
ti.procIpcBatch() # regular processing,
|
||||||
if status.isStopped():
|
if status.isStopped():
|
||||||
th.procIpcBatch() # one last time?
|
ti.procIpcBatch() # one last time?
|
||||||
# self._updateThreadStatus()
|
# self._updateThreadStatus()
|
||||||
|
|
||||||
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
|
time.sleep(self.PAUSE_BETWEEN_IPC_CHECK) # pause, before next round
|
||||||
|
@ -572,7 +634,8 @@ class ServiceManager:
|
||||||
if not ti.isFirst():
|
if not ti.isFirst():
|
||||||
tFirst = self._getFirstInstance()
|
tFirst = self._getFirstInstance()
|
||||||
tFirst.createDnode(ti.getDbTarget())
|
tFirst.createDnode(ti.getDbTarget())
|
||||||
ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
|
ti.printFirst10Lines()
|
||||||
|
# ti.getSmThread().procIpcBatch(trimToTarget=10, forceOutput=True) # for printing 10 lines
|
||||||
|
|
||||||
def stopTaosServices(self):
|
def stopTaosServices(self):
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
@ -618,21 +681,24 @@ class ServiceManagerThread:
|
||||||
"""
|
"""
|
||||||
MAX_QUEUE_SIZE = 10000
|
MAX_QUEUE_SIZE = 10000
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self, subProc: TdeSubProcess, logDir: str):
|
||||||
# Set the sub process
|
# Set the sub process
|
||||||
self._tdeSubProcess = None # type: TdeSubProcess
|
# self._tdeSubProcess = None # type: TdeSubProcess
|
||||||
|
|
||||||
# Arrange the TDengine instance
|
# Arrange the TDengine instance
|
||||||
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
|
# self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
|
||||||
# self._tInst = tInst or TdeInstance() # Need an instance
|
# self._tInst = tInst or TdeInstance() # Need an instance
|
||||||
|
|
||||||
self._thread = None # The actual thread, # type: threading.Thread
|
# self._thread = None # type: Optional[threading.Thread] # The actual thread, # type: threading.Thread
|
||||||
self._thread2 = None # watching stderr
|
# self._thread2 = None # type: Optional[threading.Thread] Thread # watching stderr
|
||||||
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
|
self._status = Status(Status.STATUS_STOPPED) # The status of the underlying service, actually.
|
||||||
|
|
||||||
|
self._start(subProc, logDir)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "[SvcMgrThread: status={}, subProc={}]".format(
|
raise CrashGenError("SMT status moved to TdeSubProcess")
|
||||||
self.getStatus(), self._tdeSubProcess)
|
# return "[SvcMgrThread: status={}, subProc={}]".format(
|
||||||
|
# self.getStatus(), self._tdeSubProcess)
|
||||||
|
|
||||||
def getStatus(self):
|
def getStatus(self):
|
||||||
'''
|
'''
|
||||||
|
@ -640,30 +706,33 @@ class ServiceManagerThread:
|
||||||
'''
|
'''
|
||||||
return self._status
|
return self._status
|
||||||
|
|
||||||
|
def setStatus(self, statusVal: int):
|
||||||
|
self._status.set(statusVal)
|
||||||
|
|
||||||
# Start the thread (with sub process), and wait for the sub service
|
# Start the thread (with sub process), and wait for the sub service
|
||||||
# to become fully operational
|
# to become fully operational
|
||||||
def start(self, cmdLine : str, logDir: str):
|
def _start(self, subProc :TdeSubProcess, logDir: str):
|
||||||
'''
|
'''
|
||||||
Request the manager thread to start a new sub process, and manage it.
|
Request the manager thread to start a new sub process, and manage it.
|
||||||
|
|
||||||
:param cmdLine: the command line to invoke
|
:param cmdLine: the command line to invoke
|
||||||
:param logDir: the logging directory, to hold stdout/stderr files
|
:param logDir: the logging directory, to hold stdout/stderr files
|
||||||
'''
|
'''
|
||||||
if self._thread:
|
# if self._thread:
|
||||||
raise RuntimeError("Unexpected _thread")
|
# raise RuntimeError("Unexpected _thread")
|
||||||
if self._tdeSubProcess:
|
# if self._tdeSubProcess:
|
||||||
raise RuntimeError("TDengine sub process already created/running")
|
# raise RuntimeError("TDengine sub process already created/running")
|
||||||
|
|
||||||
Logging.info("Attempting to start TAOS service: {}".format(self))
|
# Moved to TdeSubProcess
|
||||||
|
# Logging.info("Attempting to start TAOS service: {}".format(self))
|
||||||
|
|
||||||
self._status.set(Status.STATUS_STARTING)
|
self._status.set(Status.STATUS_STARTING)
|
||||||
self._tdeSubProcess = TdeSubProcess()
|
# self._tdeSubProcess = TdeSubProcess.start(cmdLine) # TODO: verify process is running
|
||||||
self._tdeSubProcess.start(cmdLine) # TODO: verify process is running
|
|
||||||
|
|
||||||
self._ipcQueue = Queue()
|
self._ipcQueue = Queue() # type: Queue
|
||||||
self._thread = threading.Thread( # First thread captures server OUTPUT
|
self._thread = threading.Thread( # First thread captures server OUTPUT
|
||||||
target=self.svcOutputReader,
|
target=self.svcOutputReader,
|
||||||
args=(self._tdeSubProcess.getStdOut(), self._ipcQueue, logDir))
|
args=(subProc.getStdOut(), self._ipcQueue, logDir))
|
||||||
self._thread.daemon = True # thread dies with the program
|
self._thread.daemon = True # thread dies with the program
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
@ -675,7 +744,7 @@ class ServiceManagerThread:
|
||||||
|
|
||||||
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
|
||||||
target=self.svcErrorReader,
|
target=self.svcErrorReader,
|
||||||
args=(self._tdeSubProcess.getStdErr(), self._ipcQueue, logDir))
|
args=(subProc.getStdErr(), self._ipcQueue, logDir))
|
||||||
self._thread2.daemon = True # thread dies with the program
|
self._thread2.daemon = True # thread dies with the program
|
||||||
self._thread2.start()
|
self._thread2.start()
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
@ -690,14 +759,14 @@ class ServiceManagerThread:
|
||||||
Progress.emit(Progress.SERVICE_START_NAP)
|
Progress.emit(Progress.SERVICE_START_NAP)
|
||||||
# print("_zz_", end="", flush=True)
|
# print("_zz_", end="", flush=True)
|
||||||
if self._status.isRunning():
|
if self._status.isRunning():
|
||||||
Logging.info("[] TDengine service READY to process requests")
|
Logging.info("[] TDengine service READY to process requests: pid={}".format(subProc.getPid()))
|
||||||
Logging.info("[] TAOS service started: {}".format(self))
|
# Logging.info("[] TAOS service started: {}".format(self))
|
||||||
# self._verifyDnode(self._tInst) # query and ensure dnode is ready
|
# self._verifyDnode(self._tInst) # query and ensure dnode is ready
|
||||||
# Logging.debug("[] TAOS Dnode verified: {}".format(self))
|
# Logging.debug("[] TAOS Dnode verified: {}".format(self))
|
||||||
return # now we've started
|
return # now we've started
|
||||||
# TODO: handle failure-to-start better?
|
# TODO: handle failure-to-start better?
|
||||||
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
|
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
|
||||||
raise RuntimeError("TDengine service did not start successfully: {}".format(self))
|
raise RuntimeError("TDengine service DID NOT achieve READY status: pid={}".format(subProc.getPid()))
|
||||||
|
|
||||||
def _verifyDnode(self, tInst: TdeInstance):
|
def _verifyDnode(self, tInst: TdeInstance):
|
||||||
dbc = DbConn.createNative(tInst.getDbTarget())
|
dbc = DbConn.createNative(tInst.getDbTarget())
|
||||||
|
@ -717,70 +786,45 @@ class ServiceManagerThread:
|
||||||
break
|
break
|
||||||
if not isValid:
|
if not isValid:
|
||||||
print("Failed to start dnode, sleep for a while")
|
print("Failed to start dnode, sleep for a while")
|
||||||
time.sleep(600)
|
time.sleep(10.0)
|
||||||
raise RuntimeError("Failed to start Dnode, expected port not found: {}".
|
raise RuntimeError("Failed to start Dnode, expected port not found: {}".
|
||||||
format(tInst.getPort()))
|
format(tInst.getPort()))
|
||||||
dbc.close()
|
dbc.close()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
# can be called from both main thread or signal handler
|
# can be called from both main thread or signal handler
|
||||||
Logging.info("Terminating TDengine service running as the sub process...")
|
|
||||||
if self.getStatus().isStopped():
|
|
||||||
Logging.info("Service already stopped")
|
|
||||||
return
|
|
||||||
if self.getStatus().isStopping():
|
|
||||||
Logging.info("Service is already being stopped, pid: {}".format(self._tdeSubProcess.getPid()))
|
|
||||||
return
|
|
||||||
# Linux will send Control-C generated SIGINT to the TDengine process
|
|
||||||
# already, ref:
|
|
||||||
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
|
|
||||||
if not self._tdeSubProcess:
|
|
||||||
raise RuntimeError("sub process object missing")
|
|
||||||
|
|
||||||
self._status.set(Status.STATUS_STOPPING)
|
# Linux will send Control-C generated SIGINT to the TDengine process already, ref:
|
||||||
# retCode = self._tdeSubProcess.stop()
|
# https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
|
||||||
# try:
|
|
||||||
# retCode = self._tdeSubProcess.stop()
|
self.join() # stop the thread, status change moved to TdeSubProcess
|
||||||
# # print("Attempted to stop sub process, got return code: {}".format(retCode))
|
|
||||||
# if retCode == signal.SIGSEGV : # SGV
|
|
||||||
# Logging.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
|
|
||||||
# except subprocess.TimeoutExpired as err:
|
|
||||||
# Logging.info("Time out waiting for TDengine service process to exit")
|
|
||||||
if not self._tdeSubProcess.stop(): # everything withing
|
|
||||||
if self._tdeSubProcess.isRunning(): # still running, should now never happen
|
|
||||||
Logging.error("FAILED to stop sub process, it is still running... pid = {}".format(
|
|
||||||
self._tdeSubProcess.getPid()))
|
|
||||||
else:
|
|
||||||
self._tdeSubProcess = None # not running any more
|
|
||||||
self.join() # stop the thread, change the status, etc.
|
|
||||||
|
|
||||||
# Check if it's really stopped
|
# Check if it's really stopped
|
||||||
outputLines = 10 # for last output
|
outputLines = 10 # for last output
|
||||||
if self.getStatus().isStopped():
|
if self.getStatus().isStopped():
|
||||||
self.procIpcBatch(outputLines) # one last time
|
self.procIpcBatch(outputLines) # one last time
|
||||||
Logging.debug("End of TDengine Service Output: {}".format(self))
|
Logging.debug("End of TDengine Service Output")
|
||||||
Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
|
Logging.info("----- TDengine Service (managed by SMT) is now terminated -----\n")
|
||||||
else:
|
else:
|
||||||
print("WARNING: SMT did not terminate as expected: {}".format(self))
|
print("WARNING: SMT did not terminate as expected")
|
||||||
|
|
||||||
def join(self):
|
def join(self):
|
||||||
# TODO: sanity check
|
# TODO: sanity check
|
||||||
if not self.getStatus().isStopping():
|
s = self.getStatus()
|
||||||
|
if s.isStopping() or s.isStopped(): # we may be stopping ourselves, or have been stopped/killed by others
|
||||||
|
if self._thread or self._thread2 :
|
||||||
|
if self._thread:
|
||||||
|
self._thread.join()
|
||||||
|
self._thread = None
|
||||||
|
if self._thread2: # STD ERR thread
|
||||||
|
self._thread2.join()
|
||||||
|
self._thread2 = None
|
||||||
|
else:
|
||||||
|
Logging.warning("Joining empty thread, doing nothing")
|
||||||
|
else:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"SMT.Join(): Unexpected status: {}".format(self._status))
|
"SMT.Join(): Unexpected status: {}".format(self._status))
|
||||||
|
|
||||||
if self._thread or self._thread2 :
|
|
||||||
if self._thread:
|
|
||||||
self._thread.join()
|
|
||||||
self._thread = None
|
|
||||||
if self._thread2: # STD ERR thread
|
|
||||||
self._thread2.join()
|
|
||||||
self._thread2 = None
|
|
||||||
else:
|
|
||||||
print("Joining empty thread, doing nothing")
|
|
||||||
|
|
||||||
self._status.set(Status.STATUS_STOPPED)
|
|
||||||
|
|
||||||
def _trimQueue(self, targetSize):
|
def _trimQueue(self, targetSize):
|
||||||
if targetSize <= 0:
|
if targetSize <= 0:
|
||||||
return # do nothing
|
return # do nothing
|
||||||
|
@ -799,6 +843,10 @@ class ServiceManagerThread:
|
||||||
TD_READY_MSG = "TDengine is initialized successfully"
|
TD_READY_MSG = "TDengine is initialized successfully"
|
||||||
|
|
||||||
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
|
def procIpcBatch(self, trimToTarget=0, forceOutput=False):
|
||||||
|
'''
|
||||||
|
Process a batch of STDOUT/STDERR data, until we read EMPTY from
|
||||||
|
the queue.
|
||||||
|
'''
|
||||||
self._trimQueue(trimToTarget) # trim if necessary
|
self._trimQueue(trimToTarget) # trim if necessary
|
||||||
# Process all the output generated by the underlying sub process,
|
# Process all the output generated by the underlying sub process,
|
||||||
# managed by IO thread
|
# managed by IO thread
|
||||||
|
@ -827,35 +875,54 @@ class ServiceManagerThread:
|
||||||
print(pBar, end="", flush=True)
|
print(pBar, end="", flush=True)
|
||||||
print('\b\b\b\b', end="", flush=True)
|
print('\b\b\b\b', end="", flush=True)
|
||||||
|
|
||||||
def svcOutputReader(self, out: IO, queue, logDir: str):
|
BinaryChunk = NewType('BinaryChunk', bytes) # line with binary data, directly from STDOUT, etc.
|
||||||
|
TextChunk = NewType('TextChunk', str) # properly decoded, suitable for printing, etc.
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _decodeBinaryChunk(cls, bChunk: bytes) -> Optional[TextChunk] :
|
||||||
|
try:
|
||||||
|
tChunk = bChunk.decode("utf-8").rstrip()
|
||||||
|
return cls.TextChunk(tChunk)
|
||||||
|
except UnicodeError:
|
||||||
|
print("\nNon-UTF8 server output: {}\n".format(bChunk.decode('cp437')))
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _textChunkGenerator(self, streamIn: BinaryIO, logDir: str, logFile: str
|
||||||
|
) -> Generator[TextChunk, None, None]:
|
||||||
|
'''
|
||||||
|
Take an input stream with binary data, produced a generator of decoded
|
||||||
|
"text chunks", and also save the original binary data in a log file.
|
||||||
|
'''
|
||||||
|
os.makedirs(logDir, exist_ok=True)
|
||||||
|
logF = open(os.path.join(logDir, logFile), 'wb')
|
||||||
|
for bChunk in iter(streamIn.readline, b''):
|
||||||
|
logF.write(bChunk) # Write to log file immediately
|
||||||
|
tChunk = self._decodeBinaryChunk(bChunk) # decode
|
||||||
|
if tChunk is not None:
|
||||||
|
yield tChunk # TODO: split into actual text lines
|
||||||
|
|
||||||
|
# At the end...
|
||||||
|
streamIn.close() # Close the stream
|
||||||
|
logF.close() # Close the output file
|
||||||
|
|
||||||
|
def svcOutputReader(self, stdOut: BinaryIO, queue, logDir: str):
|
||||||
'''
|
'''
|
||||||
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
The infinite routine that processes the STDOUT stream for the sub process being managed.
|
||||||
|
|
||||||
:param out: the IO stream object used to fetch the data from
|
:param stdOut: the IO stream object used to fetch the data from
|
||||||
:param queue: the queue where we dump the roughly parsed line-by-line data
|
:param queue: the queue where we dump the roughly parsed chunk-by-chunk text data
|
||||||
:param logDir: where we should dump a verbatim output file
|
:param logDir: where we should dump a verbatim output file
|
||||||
'''
|
'''
|
||||||
os.makedirs(logDir, exist_ok=True)
|
|
||||||
logFile = os.path.join(logDir,'stdout.log')
|
|
||||||
fOut = open(logFile, 'wb')
|
|
||||||
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
# Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
|
||||||
# print("This is the svcOutput Reader...")
|
# print("This is the svcOutput Reader...")
|
||||||
# for line in out :
|
# stdOut.readline() # Skip the first output? TODO: remove?
|
||||||
for line in iter(out.readline, b''):
|
for tChunk in self._textChunkGenerator(stdOut, logDir, 'stdout.log') :
|
||||||
fOut.write(line)
|
queue.put(tChunk) # tChunk garanteed not to be None
|
||||||
# print("Finished reading a line: {}".format(line))
|
|
||||||
# print("Adding item to queue...")
|
|
||||||
try:
|
|
||||||
line = line.decode("utf-8").rstrip()
|
|
||||||
except UnicodeError:
|
|
||||||
print("\nNon-UTF8 server output: {}\n".format(line))
|
|
||||||
|
|
||||||
# This might block, and then causing "out" buffer to block
|
|
||||||
queue.put(line)
|
|
||||||
self._printProgress("_i")
|
self._printProgress("_i")
|
||||||
|
|
||||||
if self._status.isStarting(): # we are starting, let's see if we have started
|
if self._status.isStarting(): # we are starting, let's see if we have started
|
||||||
if line.find(self.TD_READY_MSG) != -1: # found
|
if tChunk.find(self.TD_READY_MSG) != -1: # found
|
||||||
Logging.info("Waiting for the service to become FULLY READY")
|
Logging.info("Waiting for the service to become FULLY READY")
|
||||||
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
|
time.sleep(1.0) # wait for the server to truly start. TODO: remove this
|
||||||
Logging.info("Service is now FULLY READY") # TODO: more ID info here?
|
Logging.info("Service is now FULLY READY") # TODO: more ID info here?
|
||||||
|
@ -869,18 +936,17 @@ class ServiceManagerThread:
|
||||||
print("_w", end="", flush=True)
|
print("_w", end="", flush=True)
|
||||||
|
|
||||||
# queue.put(line)
|
# queue.put(line)
|
||||||
# meaning sub process must have died
|
# stdOut has no more data, meaning sub process must have died
|
||||||
Logging.info("EOF for TDengine STDOUT: {}".format(self))
|
Logging.info("EOF found TDengine STDOUT, marking the process as terminated")
|
||||||
out.close() # Close the stream
|
self.setStatus(Status.STATUS_STOPPED)
|
||||||
fOut.close() # Close the output file
|
|
||||||
|
|
||||||
def svcErrorReader(self, err: IO, queue, logDir: str):
|
def svcErrorReader(self, stdErr: BinaryIO, queue, logDir: str):
|
||||||
os.makedirs(logDir, exist_ok=True)
|
# os.makedirs(logDir, exist_ok=True)
|
||||||
logFile = os.path.join(logDir,'stderr.log')
|
# logFile = os.path.join(logDir,'stderr.log')
|
||||||
fErr = open(logFile, 'wb')
|
# fErr = open(logFile, 'wb')
|
||||||
for line in iter(err.readline, b''):
|
# for line in iter(err.readline, b''):
|
||||||
fErr.write(line)
|
for tChunk in self._textChunkGenerator(stdErr, logDir, 'stderr.log') :
|
||||||
Logging.info("TDengine STDERR: {}".format(line))
|
queue.put(tChunk) # tChunk garanteed not to be None
|
||||||
Logging.info("EOF for TDengine STDERR: {}".format(self))
|
# fErr.write(line)
|
||||||
err.close()
|
Logging.info("TDengine STDERR: {}".format(tChunk))
|
||||||
fErr.close()
|
Logging.info("EOF for TDengine STDERR")
|
||||||
|
|
|
@ -1,8 +0,0 @@
|
||||||
from __future__ import annotations
|
|
||||||
import argparse
|
|
||||||
|
|
||||||
gConfig: argparse.Namespace
|
|
||||||
|
|
||||||
def init():
|
|
||||||
global gConfig
|
|
||||||
gConfig = []
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
from __future__ import annotations
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from .misc import CrashGenError
|
||||||
|
|
||||||
|
# from crash_gen.misc import CrashGenError
|
||||||
|
|
||||||
|
# gConfig: Optional[argparse.Namespace]
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
_config = None # type Optional[argparse.Namespace]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def init(cls, parser: argparse.ArgumentParser):
|
||||||
|
if cls._config is not None:
|
||||||
|
raise CrashGenError("Config can only be initialized once")
|
||||||
|
cls._config = parser.parse_args()
|
||||||
|
# print(cls._config)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setConfig(cls, config: argparse.Namespace):
|
||||||
|
cls._config = config
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
# TODO: check items instead of exposing everything
|
||||||
|
def getConfig(cls) -> argparse.Namespace:
|
||||||
|
if cls._config is None:
|
||||||
|
raise CrashGenError("invalid state")
|
||||||
|
return cls._config
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def clearConfig(cls):
|
||||||
|
cls._config = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def isSet(cls, cfgKey):
|
||||||
|
cfg = cls.getConfig()
|
||||||
|
if cfgKey not in cfg:
|
||||||
|
return False
|
||||||
|
return cfg.__getattribute__(cfgKey)
|
|
@ -1,24 +1,26 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import os
|
||||||
|
import datetime
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
import requests
|
import requests
|
||||||
from requests.auth import HTTPBasicAuth
|
from requests.auth import HTTPBasicAuth
|
||||||
|
|
||||||
|
|
||||||
import taos
|
import taos
|
||||||
from util.sql import *
|
from util.sql import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
from util.dnodes import *
|
from util.dnodes import *
|
||||||
from util.log import *
|
from util.log import *
|
||||||
|
|
||||||
from .misc import Logging, CrashGenError, Helper, Dice
|
|
||||||
import os
|
|
||||||
import datetime
|
|
||||||
import traceback
|
import traceback
|
||||||
# from .service_manager import TdeInstance
|
# from .service_manager import TdeInstance
|
||||||
|
|
||||||
import crash_gen.settings
|
from .config import Config
|
||||||
|
from .misc import Logging, CrashGenError, Helper
|
||||||
|
from .types import QueryResult
|
||||||
|
|
||||||
class DbConn:
|
class DbConn:
|
||||||
TYPE_NATIVE = "native-c"
|
TYPE_NATIVE = "native-c"
|
||||||
|
@ -79,7 +81,7 @@ class DbConn:
|
||||||
raise RuntimeError("Cannot query database until connection is open")
|
raise RuntimeError("Cannot query database until connection is open")
|
||||||
nRows = self.query(sql)
|
nRows = self.query(sql)
|
||||||
if nRows != 1:
|
if nRows != 1:
|
||||||
raise taos.error.ProgrammingError(
|
raise CrashGenError(
|
||||||
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
|
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
|
||||||
(CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
|
(CrashGenError.INVALID_EMPTY_RESULT if nRows==0 else CrashGenError.INVALID_MULTIPLE_RESULT)
|
||||||
)
|
)
|
||||||
|
@ -115,7 +117,7 @@ class DbConn:
|
||||||
try:
|
try:
|
||||||
self.execute(sql)
|
self.execute(sql)
|
||||||
return True # ignore num of results, return success
|
return True # ignore num of results, return success
|
||||||
except taos.error.ProgrammingError as err:
|
except taos.error.Error as err:
|
||||||
return False # failed, for whatever TAOS reason
|
return False # failed, for whatever TAOS reason
|
||||||
# Not possile to reach here, non-TAOS exception would have been thrown
|
# Not possile to reach here, non-TAOS exception would have been thrown
|
||||||
|
|
||||||
|
@ -126,7 +128,7 @@ class DbConn:
|
||||||
def openByType(self):
|
def openByType(self):
|
||||||
raise RuntimeError("Unexpected execution, should be overriden")
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
def getQueryResult(self):
|
def getQueryResult(self) -> QueryResult :
|
||||||
raise RuntimeError("Unexpected execution, should be overriden")
|
raise RuntimeError("Unexpected execution, should be overriden")
|
||||||
|
|
||||||
def getResultRows(self):
|
def getResultRows(self):
|
||||||
|
@ -221,7 +223,7 @@ class DbConnRest(DbConn):
|
||||||
class MyTDSql:
|
class MyTDSql:
|
||||||
# Class variables
|
# Class variables
|
||||||
_clsLock = threading.Lock() # class wide locking
|
_clsLock = threading.Lock() # class wide locking
|
||||||
longestQuery = None # type: str
|
longestQuery = '' # type: str
|
||||||
longestQueryTime = 0.0 # seconds
|
longestQueryTime = 0.0 # seconds
|
||||||
lqStartTime = 0.0
|
lqStartTime = 0.0
|
||||||
# lqEndTime = 0.0 # Not needed, as we have the two above already
|
# lqEndTime = 0.0 # Not needed, as we have the two above already
|
||||||
|
@ -249,7 +251,13 @@ class MyTDSql:
|
||||||
def _execInternal(self, sql):
|
def _execInternal(self, sql):
|
||||||
startTime = time.time()
|
startTime = time.time()
|
||||||
# Logging.debug("Executing SQL: " + sql)
|
# Logging.debug("Executing SQL: " + sql)
|
||||||
|
# ret = None # TODO: use strong type here
|
||||||
|
# try: # Let's not capture the error, and let taos.error.ProgrammingError pass through
|
||||||
ret = self._cursor.execute(sql)
|
ret = self._cursor.execute(sql)
|
||||||
|
# except taos.error.ProgrammingError as err:
|
||||||
|
# Logging.warning("Taos SQL execution error: {}, SQL: {}".format(err.msg, sql))
|
||||||
|
# raise CrashGenError(err.msg)
|
||||||
|
|
||||||
# print("\nSQL success: {}".format(sql))
|
# print("\nSQL success: {}".format(sql))
|
||||||
queryTime = time.time() - startTime
|
queryTime = time.time() - startTime
|
||||||
# Record the query time
|
# Record the query time
|
||||||
|
@ -261,7 +269,7 @@ class MyTDSql:
|
||||||
cls.lqStartTime = startTime
|
cls.lqStartTime = startTime
|
||||||
|
|
||||||
# Now write to the shadow database
|
# Now write to the shadow database
|
||||||
if crash_gen.settings.gConfig.use_shadow_db:
|
if Config.isSet('use_shadow_db'):
|
||||||
if sql[:11] == "INSERT INTO":
|
if sql[:11] == "INSERT INTO":
|
||||||
if sql[:16] == "INSERT INTO db_0":
|
if sql[:16] == "INSERT INTO db_0":
|
||||||
sql2 = "INSERT INTO db_s" + sql[16:]
|
sql2 = "INSERT INTO db_s" + sql[16:]
|
||||||
|
@ -453,31 +461,11 @@ class DbManager():
|
||||||
''' Release the underlying DB connection upon deletion of DbManager '''
|
''' Release the underlying DB connection upon deletion of DbManager '''
|
||||||
self.cleanUp()
|
self.cleanUp()
|
||||||
|
|
||||||
def getDbConn(self):
|
def getDbConn(self) -> DbConn :
|
||||||
|
if self._dbConn is None:
|
||||||
|
raise CrashGenError("Unexpected empty DbConn")
|
||||||
return self._dbConn
|
return self._dbConn
|
||||||
|
|
||||||
# TODO: not used any more, to delete
|
|
||||||
def pickAndAllocateTable(self): # pick any table, and "use" it
|
|
||||||
return self.tableNumQueue.pickAndAllocate()
|
|
||||||
|
|
||||||
# TODO: Not used any more, to delete
|
|
||||||
def addTable(self):
|
|
||||||
with self._lock:
|
|
||||||
tIndex = self.tableNumQueue.push()
|
|
||||||
return tIndex
|
|
||||||
|
|
||||||
# Not used any more, to delete
|
|
||||||
def releaseTable(self, i): # return the table back, so others can use it
|
|
||||||
self.tableNumQueue.release(i)
|
|
||||||
|
|
||||||
# TODO: not used any more, delete
|
|
||||||
def getTableNameToDelete(self):
|
|
||||||
tblNum = self.tableNumQueue.pop() # TODO: race condition!
|
|
||||||
if (not tblNum): # maybe false
|
|
||||||
return False
|
|
||||||
|
|
||||||
return "table_{}".format(tblNum)
|
|
||||||
|
|
||||||
def cleanUp(self):
|
def cleanUp(self):
|
||||||
if self._dbConn:
|
if self._dbConn:
|
||||||
self._dbConn.close()
|
self._dbConn.close()
|
|
@ -3,6 +3,7 @@ import random
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
import taos
|
import taos
|
||||||
|
|
||||||
|
@ -39,14 +40,14 @@ class MyLoggingAdapter(logging.LoggerAdapter):
|
||||||
|
|
||||||
|
|
||||||
class Logging:
|
class Logging:
|
||||||
logger = None
|
logger = None # type: Optional[MyLoggingAdapter]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def getLogger(cls):
|
def getLogger(cls):
|
||||||
return logger
|
return cls.logger
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def clsInit(cls, gConfig): # TODO: refactor away gConfig
|
def clsInit(cls, debugMode: bool):
|
||||||
if cls.logger:
|
if cls.logger:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
@ -60,13 +61,9 @@ class Logging:
|
||||||
# Logging adapter, to be used as a logger
|
# Logging adapter, to be used as a logger
|
||||||
# print("setting logger variable")
|
# print("setting logger variable")
|
||||||
# global logger
|
# global logger
|
||||||
cls.logger = MyLoggingAdapter(_logger, [])
|
cls.logger = MyLoggingAdapter(_logger, {})
|
||||||
|
cls.logger.setLevel(logging.DEBUG if debugMode else logging.INFO) # default seems to be INFO
|
||||||
if (gConfig.debug):
|
|
||||||
cls.logger.setLevel(logging.DEBUG) # default seems to be INFO
|
|
||||||
else:
|
|
||||||
cls.logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def info(cls, msg):
|
def info(cls, msg):
|
||||||
cls.logger.info(msg)
|
cls.logger.info(msg)
|
||||||
|
@ -84,6 +81,7 @@ class Logging:
|
||||||
cls.logger.error(msg)
|
cls.logger.error(msg)
|
||||||
|
|
||||||
class Status:
|
class Status:
|
||||||
|
STATUS_EMPTY = 99
|
||||||
STATUS_STARTING = 1
|
STATUS_STARTING = 1
|
||||||
STATUS_RUNNING = 2
|
STATUS_RUNNING = 2
|
||||||
STATUS_STOPPING = 3
|
STATUS_STOPPING = 3
|
||||||
|
@ -95,12 +93,16 @@ class Status:
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "[Status: v={}]".format(self._status)
|
return "[Status: v={}]".format(self._status)
|
||||||
|
|
||||||
def set(self, status):
|
def set(self, status: int):
|
||||||
self._status = status
|
self._status = status
|
||||||
|
|
||||||
def get(self):
|
def get(self):
|
||||||
return self._status
|
return self._status
|
||||||
|
|
||||||
|
def isEmpty(self):
|
||||||
|
''' Empty/Undefined '''
|
||||||
|
return self._status == Status.STATUS_EMPTY
|
||||||
|
|
||||||
def isStarting(self):
|
def isStarting(self):
|
||||||
return self._status == Status.STATUS_STARTING
|
return self._status == Status.STATUS_STARTING
|
||||||
|
|
||||||
|
@ -117,6 +119,9 @@ class Status:
|
||||||
def isStable(self):
|
def isStable(self):
|
||||||
return self.isRunning() or self.isStopped()
|
return self.isRunning() or self.isStopped()
|
||||||
|
|
||||||
|
def isActive(self):
|
||||||
|
return self.isStarting() or self.isRunning() or self.isStopping()
|
||||||
|
|
||||||
# Deterministic random number generator
|
# Deterministic random number generator
|
||||||
class Dice():
|
class Dice():
|
||||||
seeded = False # static, uninitialized
|
seeded = False # static, uninitialized
|
|
@ -0,0 +1,28 @@
|
||||||
|
from typing import Any, List, Dict, NewType
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
DirPath = NewType('DirPath', str)
|
||||||
|
|
||||||
|
QueryResult = NewType('QueryResult', List[List[Any]])
|
||||||
|
|
||||||
|
class TdDataType(Enum):
|
||||||
|
'''
|
||||||
|
Use a Python Enum types of represent all the data types in TDengine.
|
||||||
|
|
||||||
|
Ref: https://www.taosdata.com/cn/documentation/taos-sql#data-type
|
||||||
|
'''
|
||||||
|
TIMESTAMP = 'TIMESTAMP'
|
||||||
|
INT = 'INT'
|
||||||
|
BIGINT = 'BIGINT'
|
||||||
|
FLOAT = 'FLOAT'
|
||||||
|
DOUBLE = 'DOUBLE'
|
||||||
|
BINARY = 'BINARY'
|
||||||
|
BINARY16 = 'BINARY(16)' # TODO: get rid of this hack
|
||||||
|
BINARY200 = 'BINARY(200)'
|
||||||
|
SMALLINT = 'SMALLINT'
|
||||||
|
TINYINT = 'TINYINT'
|
||||||
|
BOOL = 'BOOL'
|
||||||
|
NCHAR = 'NCHAR'
|
||||||
|
|
||||||
|
TdColumns = Dict[str, TdDataType]
|
||||||
|
TdTags = Dict[str, TdDataType]
|
|
@ -0,0 +1,485 @@
|
||||||
|
#!/usr/bin/python3.8
|
||||||
|
|
||||||
|
from abc import abstractmethod
|
||||||
|
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketsApi
|
||||||
|
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import textwrap
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import taos
|
||||||
|
|
||||||
|
from crash_gen.crash_gen_main import Database, TdSuperTable
|
||||||
|
from crash_gen.service_manager import TdeInstance
|
||||||
|
|
||||||
|
from crash_gen.shared.config import Config
|
||||||
|
from crash_gen.shared.db import DbConn
|
||||||
|
from crash_gen.shared.misc import Dice, Logging, Helper
|
||||||
|
from crash_gen.shared.types import TdDataType
|
||||||
|
|
||||||
|
|
||||||
|
# NUM_PROCESSES = 10
|
||||||
|
# NUM_REPS = 1000
|
||||||
|
|
||||||
|
tick = int(time.time() - 5000000.0) # for now we will create max 5M record
|
||||||
|
value = 101
|
||||||
|
|
||||||
|
DB_NAME = 'mydb'
|
||||||
|
TIME_SERIES_NAME = 'widget'
|
||||||
|
|
||||||
|
MAX_SHELF = 500 # shelf number runs up to this, non-inclusive
|
||||||
|
ITEMS_PER_SHELF = 5
|
||||||
|
BATCH_SIZE = 2000 # Number of data points per request
|
||||||
|
|
||||||
|
# None_RW:
|
||||||
|
# INFLUX_TOKEN='RRzVQZs8ERCpV9cS2RXqgtM_Y6FEZuJ7Tuk0aHtZItFTfcM9ajixtGDhW8HzqNIBmG3hmztw-P4sHOstfJvjFA=='
|
||||||
|
# DevOrg_RW:
|
||||||
|
# INFLUX_TOKEN='o1P8sEhBmXKhxBmNuiCyOUKv8d7qm5wUjMff9AbskBu2LcmNPQzU77NrAn5hDil8hZ0-y1AGWpzpL-4wqjFdkA=='
|
||||||
|
# DevOrg_All_Access
|
||||||
|
INFLUX_TOKEN='T2QTr4sloJhINH_oSrwSS-WIIZYjDfD123NK4ou3b7ajRs0c0IphCh3bNc0OsDZQRW1HyCby7opdEndVYFGTWQ=='
|
||||||
|
INFLUX_ORG="DevOrg"
|
||||||
|
INFLUX_BUCKET="Bucket01"
|
||||||
|
|
||||||
|
def writeTaosBatch(dbc, tblName):
|
||||||
|
# Database.setupLastTick()
|
||||||
|
global value, tick
|
||||||
|
|
||||||
|
data = []
|
||||||
|
for i in range(0, 100):
|
||||||
|
data.append("('{}', {})".format(Database.getNextTick(), value) )
|
||||||
|
value += 1
|
||||||
|
|
||||||
|
sql = "INSERT INTO {} VALUES {}".format(tblName, ''.join(data))
|
||||||
|
dbc.execute(sql)
|
||||||
|
|
||||||
|
class PerfGenError(taos.error.ProgrammingError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class Benchmark():
|
||||||
|
|
||||||
|
# @classmethod
|
||||||
|
# def create(cls, dbType):
|
||||||
|
# if dbType == 'taos':
|
||||||
|
# return TaosBenchmark()
|
||||||
|
# elif dbType == 'influx':
|
||||||
|
# return InfluxBenchmark()
|
||||||
|
# else:
|
||||||
|
# raise RuntimeError("Unknown DB type: {}".format(dbType))
|
||||||
|
|
||||||
|
def __init__(self, dbType, loopCount = 0):
|
||||||
|
self._dbType = dbType
|
||||||
|
self._setLoopCount(loopCount)
|
||||||
|
|
||||||
|
def _setLoopCount(self, loopCount):
|
||||||
|
cfgLoopCount = Config.getConfig().loop_count
|
||||||
|
if loopCount == 0: # use config
|
||||||
|
self._loopCount = cfgLoopCount
|
||||||
|
else:
|
||||||
|
if cfgLoopCount :
|
||||||
|
Logging.warning("Ignoring loop count for fixed-loop-count benchmarks: {}".format(cfgLoopCount))
|
||||||
|
self._loopCount = loopCount
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def doIterate(self):
|
||||||
|
'''
|
||||||
|
Execute the benchmark directly, without invoking sub processes,
|
||||||
|
effectively using one execution thread.
|
||||||
|
'''
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def prepare(self):
|
||||||
|
'''
|
||||||
|
Preparation needed to run a certain benchmark
|
||||||
|
'''
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def execute(self):
|
||||||
|
'''
|
||||||
|
Actually execute the benchmark
|
||||||
|
'''
|
||||||
|
Logging.warning("Unexpected execution")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self.__class__.__name__
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
print("Running benchmark: {}, class={} ...".format(self.name, self.__class__))
|
||||||
|
startTime = time.time()
|
||||||
|
|
||||||
|
# Prepare to execute the benchmark
|
||||||
|
self.prepare()
|
||||||
|
|
||||||
|
# Actually execute the benchmark
|
||||||
|
self.execute()
|
||||||
|
|
||||||
|
# if Config.getConfig().iterate_directly: # execute directly
|
||||||
|
# Logging.debug("Iterating...")
|
||||||
|
# self.doIterate()
|
||||||
|
# else:
|
||||||
|
# Logging.debug("Executing via sub process...")
|
||||||
|
# startTime = time.time()
|
||||||
|
# self.prepare()
|
||||||
|
# self.spawnProcesses()
|
||||||
|
# self.waitForProcecess()
|
||||||
|
# duration = time.time() - startTime
|
||||||
|
# Logging.info("Benchmark execution completed in {:.3f} seconds".format(duration))
|
||||||
|
Logging.info("Benchmark {} finished in {:.3f} seconds".format(
|
||||||
|
self.name, time.time()-startTime))
|
||||||
|
|
||||||
|
def spawnProcesses(self):
|
||||||
|
self._subProcs = []
|
||||||
|
for j in range(0, Config.getConfig().subprocess_count):
|
||||||
|
ON_POSIX = 'posix' in sys.builtin_module_names
|
||||||
|
tblName = 'cars_reg_{}'.format(j)
|
||||||
|
cmdLineStr = './perf_gen.sh -t {} -i -n {} -l {}'.format(
|
||||||
|
self._dbType,
|
||||||
|
tblName,
|
||||||
|
Config.getConfig().loop_count
|
||||||
|
)
|
||||||
|
if Config.getConfig().debug:
|
||||||
|
cmdLineStr += ' -d'
|
||||||
|
subProc = subprocess.Popen(cmdLineStr,
|
||||||
|
shell = True,
|
||||||
|
close_fds = ON_POSIX)
|
||||||
|
self._subProcs.append(subProc)
|
||||||
|
|
||||||
|
def waitForProcecess(self):
|
||||||
|
for sp in self._subProcs:
|
||||||
|
sp.wait(300)
|
||||||
|
|
||||||
|
|
||||||
|
class TaosBenchmark(Benchmark):
|
||||||
|
|
||||||
|
def __init__(self, loopCount):
|
||||||
|
super().__init__('taos', loopCount)
|
||||||
|
# self._dbType = 'taos'
|
||||||
|
tInst = TdeInstance()
|
||||||
|
self._dbc = DbConn.createNative(tInst.getDbTarget())
|
||||||
|
self._dbc.open()
|
||||||
|
self._sTable = TdSuperTable(TIME_SERIES_NAME + '_s', DB_NAME)
|
||||||
|
|
||||||
|
def doIterate(self):
|
||||||
|
tblName = Config.getConfig().target_table_name
|
||||||
|
print("Benchmarking TAOS database (1 pass) for: {}".format(tblName))
|
||||||
|
self._dbc.execute("USE {}".format(DB_NAME))
|
||||||
|
|
||||||
|
self._sTable.ensureRegTable(None, self._dbc, tblName)
|
||||||
|
try:
|
||||||
|
lCount = Config.getConfig().loop_count
|
||||||
|
print("({})".format(lCount))
|
||||||
|
for i in range(0, lCount):
|
||||||
|
writeTaosBatch(self._dbc, tblName)
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
Logging.error("Failed to write batch")
|
||||||
|
|
||||||
|
def prepare(self):
|
||||||
|
self._dbc.execute("CREATE DATABASE IF NOT EXISTS {}".format(DB_NAME))
|
||||||
|
self._dbc.execute("USE {}".format(DB_NAME))
|
||||||
|
# Create the super table
|
||||||
|
self._sTable.drop(self._dbc, True)
|
||||||
|
self._sTable.create(self._dbc,
|
||||||
|
{'ts': TdDataType.TIMESTAMP,
|
||||||
|
'temperature': TdDataType.INT,
|
||||||
|
'pressure': TdDataType.INT,
|
||||||
|
'notes': TdDataType.BINARY200
|
||||||
|
},
|
||||||
|
{'rack': TdDataType.INT,
|
||||||
|
'shelf': TdDataType.INT,
|
||||||
|
'barcode': TdDataType.BINARY16
|
||||||
|
})
|
||||||
|
|
||||||
|
def execSql(self, sql):
|
||||||
|
try:
|
||||||
|
self._dbc.execute(sql)
|
||||||
|
except taos.error.ProgrammingError as err:
|
||||||
|
Logging.warning("SQL Error: 0x{:X}, {}, SQL: {}".format(
|
||||||
|
Helper.convertErrno(err.errno), err.msg, sql))
|
||||||
|
raise
|
||||||
|
|
||||||
|
def executeWrite(self):
|
||||||
|
# Sample: INSERT INTO t1 USING st TAGS(1) VALUES(now, 1) t2 USING st TAGS(2) VALUES(now, 2)
|
||||||
|
sqlPrefix = "INSERT INTO "
|
||||||
|
dataTemplate = "{} USING {} TAGS({},{},'barcode_{}') VALUES('{}',{},{},'{}') "
|
||||||
|
|
||||||
|
stName = self._sTable.getName()
|
||||||
|
BATCH_SIZE = 2000 # number of items per request batch
|
||||||
|
ITEMS_PER_SHELF = 5
|
||||||
|
|
||||||
|
# rackSize = 10 # shelves per rack
|
||||||
|
# shelfSize = 100 # items per shelf
|
||||||
|
batchCount = self._loopCount // BATCH_SIZE
|
||||||
|
lastRack = 0
|
||||||
|
for i in range(batchCount):
|
||||||
|
sql = sqlPrefix
|
||||||
|
for j in range(BATCH_SIZE):
|
||||||
|
n = i*BATCH_SIZE + j # serial number
|
||||||
|
# values first
|
||||||
|
# rtName = 'rt_' + str(n) # table name contains serial number, has info
|
||||||
|
temperature = 20 + (n % 10)
|
||||||
|
pressure = 70 + (n % 10)
|
||||||
|
# tags
|
||||||
|
shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
|
||||||
|
rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
|
||||||
|
barcode = rack + shelf
|
||||||
|
# table name
|
||||||
|
tableName = "reg_" + str(rack) + '_' + str(shelf)
|
||||||
|
# now the SQL
|
||||||
|
sql += dataTemplate.format(tableName, stName,# table name
|
||||||
|
rack, shelf, barcode, # tags
|
||||||
|
Database.getNextTick(), temperature, pressure, 'xxx') # values
|
||||||
|
lastRack = rack
|
||||||
|
self.execSql(sql)
|
||||||
|
Logging.info("Last Rack: {}".format(lastRack))
|
||||||
|
|
||||||
|
class TaosWriteBenchmark(TaosBenchmark):
|
||||||
|
def execute(self):
|
||||||
|
self.executeWrite()
|
||||||
|
|
||||||
|
class Taos100kWriteBenchmark(TaosWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(100*1000)
|
||||||
|
|
||||||
|
class Taos10kWriteBenchmark(TaosWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(10*1000)
|
||||||
|
|
||||||
|
class Taos1mWriteBenchmark(TaosWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(1000*1000)
|
||||||
|
|
||||||
|
class Taos5mWriteBenchmark(TaosWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(5*1000*1000)
|
||||||
|
|
||||||
|
class Taos1kQueryBenchmark(TaosBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(1000)
|
||||||
|
|
||||||
|
class Taos1MCreationBenchmark(TaosBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(1000000)
|
||||||
|
|
||||||
|
|
||||||
|
class InfluxBenchmark(Benchmark):
|
||||||
|
def __init__(self, loopCount):
|
||||||
|
super().__init__('influx', loopCount)
|
||||||
|
# self._dbType = 'influx'
|
||||||
|
|
||||||
|
|
||||||
|
# self._client = InfluxDBClient(host='localhost', port=8086)
|
||||||
|
|
||||||
|
# def _writeBatch(self, tblName):
|
||||||
|
# global value, tick
|
||||||
|
# data = []
|
||||||
|
# for i in range(0, 100):
|
||||||
|
# line = "{},device={} value={} {}".format(
|
||||||
|
# TIME_SERIES_NAME,
|
||||||
|
# tblName,
|
||||||
|
# value,
|
||||||
|
# tick*1000000000)
|
||||||
|
# # print(line)
|
||||||
|
# data.append(line)
|
||||||
|
# value += 1
|
||||||
|
# tick +=1
|
||||||
|
|
||||||
|
# self._client.write(data, {'db':DB_NAME}, protocol='line')
|
||||||
|
|
||||||
|
def executeWrite(self):
|
||||||
|
global tick # influx tick #TODO refactor
|
||||||
|
|
||||||
|
lineTemplate = TIME_SERIES_NAME + ",rack={},shelf={},barcode='barcode_{}' temperature={},pressure={} {}"
|
||||||
|
|
||||||
|
batchCount = self._loopCount // BATCH_SIZE
|
||||||
|
for i in range(batchCount):
|
||||||
|
lineBatch = []
|
||||||
|
for j in range(BATCH_SIZE):
|
||||||
|
n = i*BATCH_SIZE + j # serial number
|
||||||
|
# values first
|
||||||
|
# rtName = 'rt_' + str(n) # table name contains serial number, has info
|
||||||
|
temperature = 20 + (n % 10)
|
||||||
|
pressure = 70 + (n % 10)
|
||||||
|
# tags
|
||||||
|
shelf = (n // ITEMS_PER_SHELF) % MAX_SHELF # shelf number
|
||||||
|
rack = n // (ITEMS_PER_SHELF * MAX_SHELF) # rack number
|
||||||
|
barcode = rack + shelf
|
||||||
|
# now the SQL
|
||||||
|
line = lineTemplate.format(
|
||||||
|
rack, shelf, barcode, # tags
|
||||||
|
temperature, pressure, # values
|
||||||
|
tick * 1000000000 )
|
||||||
|
tick += 1
|
||||||
|
lineBatch.append(line)
|
||||||
|
write_api = self._client.write_api(write_options=SYNCHRONOUS)
|
||||||
|
write_api.write(INFLUX_BUCKET, INFLUX_ORG, lineBatch)
|
||||||
|
# self._client.write(lineBatch, {'db':DB_NAME}, protocol='line')
|
||||||
|
|
||||||
|
# def doIterate(self):
|
||||||
|
# tblName = Config.getConfig().target_table_name
|
||||||
|
# print("Benchmarking INFLUX database (1 pass) for: {}".format(tblName))
|
||||||
|
|
||||||
|
# for i in range(0, Config.getConfig().loop_count):
|
||||||
|
# self._writeBatch(tblName)
|
||||||
|
|
||||||
|
def _getOrgIdByName(self, orgName):
|
||||||
|
"""Find org by name.
|
||||||
|
|
||||||
|
"""
|
||||||
|
orgApi = self._client.organizations_api()
|
||||||
|
orgs = orgApi.find_organizations()
|
||||||
|
for org in orgs:
|
||||||
|
if org.name == orgName:
|
||||||
|
return org.id
|
||||||
|
raise PerfGenError("Org not found with name: {}".format(orgName))
|
||||||
|
|
||||||
|
def _fetchAuth(self):
|
||||||
|
authApi = self._client.authorizations_api()
|
||||||
|
auths = authApi.find_authorizations()
|
||||||
|
for auth in auths:
|
||||||
|
if auth.token == INFLUX_TOKEN :
|
||||||
|
return auth
|
||||||
|
raise PerfGenError("No proper auth found")
|
||||||
|
|
||||||
|
def _verifyPermissions(self, perms: list):
|
||||||
|
if list:
|
||||||
|
return #OK
|
||||||
|
raise PerfGenError("No permission found")
|
||||||
|
|
||||||
|
def prepare(self):
|
||||||
|
self._client = InfluxDBClient(
|
||||||
|
url="http://127.0.0.1:8086",
|
||||||
|
token=INFLUX_TOKEN,
|
||||||
|
org=INFLUX_ORG)
|
||||||
|
|
||||||
|
auth = self._fetchAuth()
|
||||||
|
|
||||||
|
self._verifyPermissions(auth.permissions)
|
||||||
|
|
||||||
|
bktApi = self._client.buckets_api()
|
||||||
|
# Delete
|
||||||
|
bkt = bktApi.find_bucket_by_name(INFLUX_BUCKET)
|
||||||
|
if bkt:
|
||||||
|
bktApi.delete_bucket(bkt)
|
||||||
|
# Recreate
|
||||||
|
|
||||||
|
orgId = self._getOrgIdByName(INFLUX_ORG)
|
||||||
|
bktApi.create_bucket(bucket=None, bucket_name=INFLUX_BUCKET, org_id=orgId)
|
||||||
|
|
||||||
|
# self._client.drop_database(DB_NAME)
|
||||||
|
# self._client.create_database(DB_NAME)
|
||||||
|
# self._client.switch_database(DB_NAME)
|
||||||
|
|
||||||
|
class InfluxWriteBenchmark(InfluxBenchmark):
|
||||||
|
def execute(self):
|
||||||
|
return self.executeWrite()
|
||||||
|
|
||||||
|
class Influx10kWriteBenchmark(InfluxWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(10*1000)
|
||||||
|
|
||||||
|
class Influx100kWriteBenchmark(InfluxWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(100*1000)
|
||||||
|
|
||||||
|
class Influx1mWriteBenchmark(InfluxWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(1000*1000)
|
||||||
|
|
||||||
|
class Influx5mWriteBenchmark(InfluxWriteBenchmark):
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__(5*1000*1000)
|
||||||
|
|
||||||
|
def _buildCmdLineParser():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
description=textwrap.dedent('''\
|
||||||
|
TDengine Performance Benchmarking Tool
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
'''))
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-b',
|
||||||
|
'--benchmark-name',
|
||||||
|
action='store',
|
||||||
|
default='Taos1kQuery',
|
||||||
|
type=str,
|
||||||
|
help='Benchmark to use (default: Taos1kQuery)')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-d',
|
||||||
|
'--debug',
|
||||||
|
action='store_true',
|
||||||
|
help='Turn on DEBUG mode for more logging (default: false)')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-i',
|
||||||
|
'--iterate-directly',
|
||||||
|
action='store_true',
|
||||||
|
help='Execution operations directly without sub-process (default: false)')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-l',
|
||||||
|
'--loop-count',
|
||||||
|
action='store',
|
||||||
|
default=1000,
|
||||||
|
type=int,
|
||||||
|
help='Number of loops to perform, 100 operations per loop. (default: 1000)')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-n',
|
||||||
|
'--target-table-name',
|
||||||
|
action='store',
|
||||||
|
default=None,
|
||||||
|
type=str,
|
||||||
|
help='Regular table name in target DB (default: None)')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-s',
|
||||||
|
'--subprocess-count',
|
||||||
|
action='store',
|
||||||
|
default=4,
|
||||||
|
type=int,
|
||||||
|
help='Number of sub processes to spawn. (default: 10)')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'-t',
|
||||||
|
'--target-database',
|
||||||
|
action='store',
|
||||||
|
default='taos',
|
||||||
|
type=str,
|
||||||
|
help='Benchmark target: taos, influx (default: taos)')
|
||||||
|
|
||||||
|
return parser
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = _buildCmdLineParser()
|
||||||
|
Config.init(parser)
|
||||||
|
Logging.clsInit(Config.getConfig().debug)
|
||||||
|
Dice.seed(0) # initial seeding of dice
|
||||||
|
|
||||||
|
bName = Config.getConfig().benchmark_name
|
||||||
|
bClassName = bName + 'Benchmark'
|
||||||
|
x = globals()
|
||||||
|
if bClassName in globals():
|
||||||
|
bClass = globals()[bClassName]
|
||||||
|
bm = bClass() # Benchmark object
|
||||||
|
bm.run()
|
||||||
|
else:
|
||||||
|
raise PerfGenError("No such benchmark: {}".format(bName))
|
||||||
|
|
||||||
|
# bm = Benchmark.create(Config.getConfig().target_database)
|
||||||
|
# bm.run()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
# This is the script for us to try to cause the TDengine server or client to crash
|
||||||
|
#
|
||||||
|
# PREPARATION
|
||||||
|
#
|
||||||
|
# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree
|
||||||
|
# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory
|
||||||
|
# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg
|
||||||
|
# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg
|
||||||
|
# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above
|
||||||
|
# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils
|
||||||
|
#
|
||||||
|
# RUNNING THIS SCRIPT
|
||||||
|
#
|
||||||
|
# This script assumes the source code directory is intact, and that the binaries has been built in the
|
||||||
|
# build/ directory, as such, will will load the Python libraries in the directory tree, and also load
|
||||||
|
# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env
|
||||||
|
# variables below.
|
||||||
|
#
|
||||||
|
# Running the script is simple, no parameter is needed (for now, but will change in the future).
|
||||||
|
#
|
||||||
|
# Happy Crashing...
|
||||||
|
|
||||||
|
|
||||||
|
# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory
|
||||||
|
EXEC_DIR=`dirname "$0"`
|
||||||
|
if [[ $EXEC_DIR != "." ]]
|
||||||
|
then
|
||||||
|
echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)"
|
||||||
|
exit -1
|
||||||
|
fi
|
||||||
|
|
||||||
|
CURR_DIR=`pwd`
|
||||||
|
IN_TDINTERNAL="community"
|
||||||
|
if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then
|
||||||
|
TAOS_DIR=$CURR_DIR/../../..
|
||||||
|
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
|
||||||
|
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6,7|rev`/lib
|
||||||
|
else
|
||||||
|
TAOS_DIR=$CURR_DIR/../..
|
||||||
|
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
|
||||||
|
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Now getting ready to execute Python
|
||||||
|
# The following is the default of our standard dev env (Ubuntu 20.04), modify/adjust at your own risk
|
||||||
|
PYTHON_EXEC=python3.8
|
||||||
|
|
||||||
|
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
|
||||||
|
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3:$(pwd)
|
||||||
|
|
||||||
|
# Then let us set up the library path so that our compiled SO file can be loaded by Python
|
||||||
|
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
|
||||||
|
|
||||||
|
# Now we are all let, and let's see if we can find a crash. Note we pass all params
|
||||||
|
PERF_GEN_EXEC=perf_gen.py
|
||||||
|
$PYTHON_EXEC $PERF_GEN_EXEC $@
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue