Added -c option to Crash_Gen, enabling REST, but encountered error
This commit is contained in:
parent
92a89f751a
commit
88ac7bb6df
|
@ -30,6 +30,8 @@ import time
|
|||
import logging
|
||||
import datetime
|
||||
import textwrap
|
||||
import requests
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from typing import List
|
||||
from typing import Dict
|
||||
|
@ -76,7 +78,8 @@ class WorkerThread:
|
|||
|
||||
# Let us have a DB connection of our own
|
||||
if ( gConfig.per_thread_db_connection ): # type: ignore
|
||||
self._dbConn = DbConn()
|
||||
# print("connector_type = {}".format(gConfig.connector_type))
|
||||
self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest()
|
||||
|
||||
self._dbInUse = False # if "use db" was executed already
|
||||
|
||||
|
@ -434,15 +437,151 @@ class LinearQueue():
|
|||
return ret
|
||||
|
||||
class DbConn:
|
||||
TYPE_NATIVE = "native-c"
|
||||
TYPE_REST = "rest-api"
|
||||
TYPE_INVALID = "invalid"
|
||||
|
||||
@classmethod
|
||||
def create(cls, connType):
|
||||
if connType == cls.TYPE_NATIVE:
|
||||
return DbConnNative()
|
||||
elif connType == cls.TYPE_REST:
|
||||
return DbConnRest()
|
||||
else:
|
||||
raise RuntimeError("Unexpected connection type: {}".format(connType))
|
||||
|
||||
@classmethod
|
||||
def createNative(cls):
|
||||
return cls.create(cls.TYPE_NATIVE)
|
||||
|
||||
@classmethod
|
||||
def createRest(cls):
|
||||
return cls.create(cls.TYPE_REST)
|
||||
|
||||
def __init__(self):
|
||||
self._conn = None
|
||||
self._cursor = None
|
||||
self.isOpen = False
|
||||
|
||||
def open(self): # Open connection
|
||||
self._type = self.TYPE_INVALID
|
||||
|
||||
def open(self):
|
||||
if ( self.isOpen ):
|
||||
raise RuntimeError("Cannot re-open an existing DB connection")
|
||||
|
||||
# below implemented by child classes
|
||||
self.openByType()
|
||||
|
||||
logger.debug("[DB] data connection opened, type = {}".format(self._type))
|
||||
self.isOpen = True
|
||||
|
||||
def resetDb(self): # reset the whole database, etc.
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot reset database until connection is open")
|
||||
# self._tdSql.prepare() # Recreate database, etc.
|
||||
|
||||
self.execute('drop database if exists db')
|
||||
logger.debug("Resetting DB, dropped database")
|
||||
# self._cursor.execute('create database db')
|
||||
# self._cursor.execute('use db')
|
||||
# tdSql.execute('show databases')
|
||||
|
||||
def queryScalar(self, sql) -> int :
|
||||
return self._queryAny(sql)
|
||||
|
||||
def queryString(self, sql) -> str :
|
||||
return self._queryAny(sql)
|
||||
|
||||
def _queryAny(self, sql) : # actual query result as an int
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot query database until connection is open")
|
||||
nRows = self.query(sql)
|
||||
if nRows != 1 :
|
||||
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
|
||||
if self.getResultRows() != 1 or self.getResultCols() != 1:
|
||||
raise RuntimeError("Unexpected result set for query: {}".format(sql))
|
||||
return self.getQueryResult()[0][0]
|
||||
|
||||
def execute(self, sql):
|
||||
raise RuntimeError("Unexpected execution, should be overriden")
|
||||
def openByType(self):
|
||||
raise RuntimeError("Unexpected execution, should be overriden")
|
||||
def getQueryResult(self):
|
||||
raise RuntimeError("Unexpected execution, should be overriden")
|
||||
def getResultRows(self):
|
||||
raise RuntimeError("Unexpected execution, should be overriden")
|
||||
def getResultCols(self):
|
||||
raise RuntimeError("Unexpected execution, should be overriden")
|
||||
|
||||
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
|
||||
class DbConnRest(DbConn):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._type = self.TYPE_REST
|
||||
self._url = "http://localhost:6020/rest/sql" # fixed for now
|
||||
self._result = None
|
||||
|
||||
def openByType(self): # Open connection
|
||||
pass # do nothing, always open
|
||||
|
||||
def close(self):
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot clean up database until connection is open")
|
||||
# Do nothing for REST
|
||||
logger.debug("[DB] REST Database connection closed")
|
||||
self.isOpen = False
|
||||
|
||||
def _doSql(self, sql):
|
||||
r = requests.post(self._url,
|
||||
data = sql,
|
||||
auth = HTTPBasicAuth('root', 'taosdata'))
|
||||
rj = r.json()
|
||||
# Sanity check for the "Json Result"
|
||||
if (not 'status' in rj):
|
||||
raise RuntimeError("No status in REST response")
|
||||
|
||||
if rj['status'] == 'error': # clearly reported error
|
||||
if (not 'code' in rj): # error without code
|
||||
raise RuntimeError("REST error return without code")
|
||||
errno = rj['code'] # May need to massage this in the future
|
||||
# print("Raising programming error with REST return: {}".format(rj))
|
||||
raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc'
|
||||
|
||||
if rj['status'] != 'succ': # better be this
|
||||
raise RuntimeError("Unexpected REST return status: {}".format(rj['status']))
|
||||
|
||||
nRows = rj['rows'] if ('rows' in rj) else 0
|
||||
self._result = rj
|
||||
return nRows
|
||||
|
||||
def execute(self, sql):
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot execute database commands until connection is open")
|
||||
logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
|
||||
nRows = self._doSql(sql)
|
||||
logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
|
||||
return nRows
|
||||
|
||||
def query(self, sql) : # return rows affected
|
||||
return self.execute(sql)
|
||||
|
||||
def getQueryResult(self):
|
||||
return self._result['data']
|
||||
|
||||
def getResultRows(self):
|
||||
print(self._result)
|
||||
raise RuntimeError("TBD")
|
||||
# return self._tdSql.queryRows
|
||||
|
||||
def getResultCols(self):
|
||||
print(self._result)
|
||||
raise RuntimeError("TBD")
|
||||
|
||||
class DbConnNative(DbConn):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._type = self.TYPE_REST
|
||||
self._conn = None
|
||||
self._cursor = None
|
||||
|
||||
def openByType(self): # Open connection
|
||||
cfgPath = "../../build/test/cfg"
|
||||
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
|
||||
self._cursor = self._conn.cursor()
|
||||
|
@ -454,21 +593,7 @@ class DbConn:
|
|||
# Open connection
|
||||
self._tdSql = TDSql()
|
||||
self._tdSql.init(self._cursor)
|
||||
logger.debug("[DB] data connection opened")
|
||||
self.isOpen = True
|
||||
|
||||
def resetDb(self): # reset the whole database, etc.
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot reset database until connection is open")
|
||||
# self._tdSql.prepare() # Recreate database, etc.
|
||||
|
||||
self._cursor.execute('drop database if exists db')
|
||||
logger.debug("Resetting DB, dropped database")
|
||||
# self._cursor.execute('create database db')
|
||||
# self._cursor.execute('use db')
|
||||
|
||||
# tdSql.execute('show databases')
|
||||
|
||||
|
||||
def close(self):
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot clean up database until connection is open")
|
||||
|
@ -496,22 +621,12 @@ class DbConn:
|
|||
def getQueryResult(self):
|
||||
return self._tdSql.queryResult
|
||||
|
||||
def _queryAny(self, sql) : # actual query result as an int
|
||||
if ( not self.isOpen ):
|
||||
raise RuntimeError("Cannot query database until connection is open")
|
||||
tSql = self._tdSql
|
||||
nRows = tSql.query(sql)
|
||||
if nRows != 1 :
|
||||
raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
|
||||
if tSql.queryRows != 1 or tSql.queryCols != 1:
|
||||
raise RuntimeError("Unexpected result set for query: {}".format(sql))
|
||||
return tSql.queryResult[0][0]
|
||||
def getResultRows(self):
|
||||
return self._tdSql.queryRows
|
||||
|
||||
def queryScalar(self, sql) -> int :
|
||||
return self._queryAny(sql)
|
||||
def getResultCols(self):
|
||||
return self._tdSql.queryCols
|
||||
|
||||
def queryString(self, sql) -> str :
|
||||
return self._queryAny(sql)
|
||||
|
||||
class AnyState:
|
||||
STATE_INVALID = -1
|
||||
|
@ -859,7 +974,7 @@ class DbManager():
|
|||
self._lock = threading.RLock()
|
||||
|
||||
# self.openDbServerConnection()
|
||||
self._dbConn = DbConn()
|
||||
self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest()
|
||||
try:
|
||||
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
|
||||
except taos.error.ProgrammingError as err:
|
||||
|
@ -1013,8 +1128,10 @@ class Task():
|
|||
try:
|
||||
self._executeInternal(te, wt) # TODO: no return value?
|
||||
except taos.error.ProgrammingError as err:
|
||||
errno2 = 0x80000000 + err.errno # positive error number
|
||||
if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600 ]) : # allowed errors
|
||||
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
|
||||
if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
|
||||
1000 # REST catch-all error
|
||||
]) : # allowed errors
|
||||
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
|
||||
print("_", end="", flush=True)
|
||||
self._err = err
|
||||
|
@ -1239,8 +1356,8 @@ class TaskDropSuperTable(StateTransitionTask):
|
|||
regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)
|
||||
try:
|
||||
self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL
|
||||
except taos.error.ProgrammingError as err:
|
||||
errno2 = 0x80000000 + err.errno # positive error number
|
||||
except taos.error.ProgrammingError as err:
|
||||
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme
|
||||
if ( errno2 in [0x362]) : # mnode invalid table name
|
||||
isSuccess = False
|
||||
logger.debug("[DB] Acceptable error when dropping a table")
|
||||
|
@ -1400,11 +1517,6 @@ class LoggingFilter(logging.Filter):
|
|||
if ( record.levelno >= logging.INFO ) :
|
||||
return True # info or above always log
|
||||
|
||||
|
||||
|
||||
# print("type = {}, value={}".format(type(msg), msg))
|
||||
# sys.exit()
|
||||
|
||||
# Commenting out below to adjust...
|
||||
|
||||
# if msg.startswith("[TRD]"):
|
||||
|
@ -1490,6 +1602,8 @@ def main():
|
|||
|
||||
'''))
|
||||
|
||||
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
|
||||
help='Connector type to use: native, rest, or mixed (default: 10)')
|
||||
parser.add_argument('-d', '--debug', action='store_true',
|
||||
help='Turn on DEBUG mode for more logging (default: false)')
|
||||
parser.add_argument('-e', '--run-tdengine', action='store_true',
|
||||
|
|
Loading…
Reference in New Issue