tbase-1470: python2&3, windows
This commit is contained in:
parent
f12c270e69
commit
a7eaa8ad7e
|
@ -13,14 +13,14 @@ def _convert_microsecond_to_datetime(micro):
|
||||||
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C bool row to python row
|
"""Function to convert C bool row to python row
|
||||||
"""
|
"""
|
||||||
_timstamp_converter = _convert_millisecond_to_datetime
|
_timestamp_converter = _convert_millisecond_to_datetime
|
||||||
if micro:
|
if micro:
|
||||||
_timstamp_converter = _convert_microsecond_to_datetime
|
_timestamp_converter = _convert_microsecond_to_datetime
|
||||||
|
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1]))
|
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1]))
|
||||||
else:
|
else:
|
||||||
return list(map(_timstamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
|
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
|
||||||
|
|
||||||
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C bool row to python row
|
"""Function to convert C bool row to python row
|
||||||
|
@ -144,6 +144,8 @@ class CTaosInterface(object):
|
||||||
libtaos.taos_use_result.restype = ctypes.c_void_p
|
libtaos.taos_use_result.restype = ctypes.c_void_p
|
||||||
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
|
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
|
||||||
libtaos.taos_errstr.restype = ctypes.c_char_p
|
libtaos.taos_errstr.restype = ctypes.c_char_p
|
||||||
|
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
||||||
|
libtaos.taos_consume.restype = ctypes.c_void_p
|
||||||
|
|
||||||
def __init__(self, config=None):
|
def __init__(self, config=None):
|
||||||
'''
|
'''
|
||||||
|
@ -252,6 +254,41 @@ class CTaosInterface(object):
|
||||||
"""
|
"""
|
||||||
return CTaosInterface.libtaos.taos_affected_rows(connection)
|
return CTaosInterface.libtaos.taos_affected_rows(connection)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def subscribe(connection, restart, topic, sql, interval):
|
||||||
|
"""Create a subscription
|
||||||
|
@restart boolean,
|
||||||
|
@sql string, sql statement for data query, must be a 'select' statement.
|
||||||
|
@topic string, name of this subscription
|
||||||
|
"""
|
||||||
|
return ctypes.c_void_p(CTaosInterface.libtaos.taos_subscribe(
|
||||||
|
connection,
|
||||||
|
1 if restart else 0,
|
||||||
|
ctypes.c_char_p(topic.encode('utf-8')),
|
||||||
|
ctypes.c_char_p(sql.encode('utf-8')),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
interval))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def consume(sub):
|
||||||
|
"""Consume data of a subscription
|
||||||
|
"""
|
||||||
|
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_consume(sub))
|
||||||
|
fields = []
|
||||||
|
pfields = CTaosInterface.fetchFields(result)
|
||||||
|
for i in range(CTaosInterface.libtaos.taos_num_fields(result)):
|
||||||
|
fields.append({'name': pfields[i].name.decode('utf-8'),
|
||||||
|
'bytes': pfields[i].bytes,
|
||||||
|
'type': ord(pfields[i].type)})
|
||||||
|
return result, fields
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def unsubscribe(sub, keepProgress):
|
||||||
|
"""Cancel a subscription
|
||||||
|
"""
|
||||||
|
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def useResult(connection):
|
def useResult(connection):
|
||||||
'''Use result after calling self.query
|
'''Use result after calling self.query
|
||||||
|
@ -275,8 +312,8 @@ class CTaosInterface(object):
|
||||||
if num_of_rows == 0:
|
if num_of_rows == 0:
|
||||||
return None, 0
|
return None, 0
|
||||||
|
|
||||||
blocks = [None] * len(fields)
|
|
||||||
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
||||||
|
blocks = [None] * len(fields)
|
||||||
for i in range(len(fields)):
|
for i in range(len(fields)):
|
||||||
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
||||||
|
|
||||||
|
@ -351,4 +388,20 @@ class CTaosInterface(object):
|
||||||
def errStr(connection):
|
def errStr(connection):
|
||||||
"""Return the error styring
|
"""Return the error styring
|
||||||
"""
|
"""
|
||||||
return CTaosInterface.libtaos.taos_errstr(connection)
|
return CTaosInterface.libtaos.taos_errstr(connection)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
cinter = CTaosInterface()
|
||||||
|
conn = cinter.connect()
|
||||||
|
|
||||||
|
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
|
||||||
|
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
|
||||||
|
|
||||||
|
result, des = CTaosInterface.useResult(conn)
|
||||||
|
|
||||||
|
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
|
||||||
|
|
||||||
|
print(data)
|
||||||
|
|
||||||
|
cinter.close(conn)
|
|
@ -1,5 +1,5 @@
|
||||||
# from .cursor import TDengineCursor
|
|
||||||
from .cursor import TDengineCursor
|
from .cursor import TDengineCursor
|
||||||
|
from .subscription import TDengineSubscription
|
||||||
from .cinterface import CTaosInterface
|
from .cinterface import CTaosInterface
|
||||||
|
|
||||||
class TDengineConnection(object):
|
class TDengineConnection(object):
|
||||||
|
@ -15,7 +15,8 @@ class TDengineConnection(object):
|
||||||
self._config = None
|
self._config = None
|
||||||
self._chandle = None
|
self._chandle = None
|
||||||
|
|
||||||
self.config(**kwargs)
|
if len(kwargs) > 0:
|
||||||
|
self.config(**kwargs)
|
||||||
|
|
||||||
def config(self, **kwargs):
|
def config(self, **kwargs):
|
||||||
# host
|
# host
|
||||||
|
@ -50,6 +51,14 @@ class TDengineConnection(object):
|
||||||
"""
|
"""
|
||||||
return CTaosInterface.close(self._conn)
|
return CTaosInterface.close(self._conn)
|
||||||
|
|
||||||
|
def subscribe(self, restart, topic, sql, interval):
|
||||||
|
"""Create a subscription.
|
||||||
|
"""
|
||||||
|
if self._conn is None:
|
||||||
|
return None
|
||||||
|
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval)
|
||||||
|
return TDengineSubscription(sub)
|
||||||
|
|
||||||
def cursor(self):
|
def cursor(self):
|
||||||
"""Return a new Cursor object using the connection.
|
"""Return a new Cursor object using the connection.
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
from .cinterface import CTaosInterface
|
||||||
|
from .error import *
|
||||||
|
|
||||||
|
class TDengineSubscription(object):
|
||||||
|
"""TDengine subscription object
|
||||||
|
"""
|
||||||
|
def __init__(self, sub):
|
||||||
|
self._sub = sub
|
||||||
|
|
||||||
|
|
||||||
|
def consume(self):
|
||||||
|
"""Consume rows of a subscription
|
||||||
|
"""
|
||||||
|
if self._sub is None:
|
||||||
|
raise OperationalError("Invalid use of consume")
|
||||||
|
|
||||||
|
result, fields = CTaosInterface.consume(self._sub)
|
||||||
|
buffer = [[] for i in range(len(fields))]
|
||||||
|
print(buffer)
|
||||||
|
while True:
|
||||||
|
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
|
||||||
|
if num_of_fields == 0: break
|
||||||
|
for i in range(len(fields)):
|
||||||
|
buffer[i].extend(block[i])
|
||||||
|
|
||||||
|
return list(map(tuple, zip(*buffer)))
|
||||||
|
|
||||||
|
|
||||||
|
def close(self, keepProgress = True):
|
||||||
|
"""Close the Subscription.
|
||||||
|
"""
|
||||||
|
if self._sub is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
CTaosInterface.unsubscribe(self._sub, keepProgress)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
from .connection import TDengineConnection
|
||||||
|
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
|
||||||
|
|
||||||
|
# Generate a cursor object to run SQL commands
|
||||||
|
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
|
||||||
|
|
||||||
|
for i in range(0,10):
|
||||||
|
data = sub.consume()
|
||||||
|
for d in data:
|
||||||
|
print(d)
|
||||||
|
|
||||||
|
sub.close()
|
||||||
|
conn.close()
|
|
@ -1,370 +1,407 @@
|
||||||
import ctypes
|
import ctypes
|
||||||
from .constants import FieldType
|
from .constants import FieldType
|
||||||
from .error import *
|
from .error import *
|
||||||
import math
|
import math
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
def _convert_millisecond_to_datetime(milli):
|
def _convert_millisecond_to_datetime(milli):
|
||||||
return datetime.datetime.fromtimestamp(milli/1000.0)
|
return datetime.datetime.fromtimestamp(milli/1000.0)
|
||||||
|
|
||||||
def _convert_microsecond_to_datetime(micro):
|
def _convert_microsecond_to_datetime(micro):
|
||||||
return datetime.datetime.fromtimestamp(micro/1000000.0)
|
return datetime.datetime.fromtimestamp(micro/1000000.0)
|
||||||
|
|
||||||
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_timestamp_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C bool row to python row
|
"""Function to convert C bool row to python row
|
||||||
"""
|
"""
|
||||||
_timestamp_converter = _convert_millisecond_to_datetime
|
_timestamp_converter = _convert_millisecond_to_datetime
|
||||||
if micro:
|
if micro:
|
||||||
_timestamp_converter = _convert_microsecond_to_datetime
|
_timestamp_converter = _convert_microsecond_to_datetime
|
||||||
|
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1]))
|
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1]))
|
||||||
else:
|
else:
|
||||||
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
|
return list(map(_timestamp_converter, ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)]))
|
||||||
|
|
||||||
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_bool_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C bool row to python row
|
"""Function to convert C bool row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
|
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
|
||||||
else:
|
else:
|
||||||
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
|
return [ None if ele == FieldType.C_BOOL_NULL else bool(ele) for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_tinyint_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C tinyint row to python row
|
"""Function to convert C tinyint row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
|
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)][::-1] ]
|
||||||
else:
|
else:
|
||||||
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
|
return [ None if ele == FieldType.C_TINYINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_smallint_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C smallint row to python row
|
"""Function to convert C smallint row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
|
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)][::-1]]
|
||||||
else:
|
else:
|
||||||
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
|
return [ None if ele == FieldType.C_SMALLINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_int_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C int row to python row
|
"""Function to convert C int row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
|
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)][::-1] ]
|
||||||
else:
|
else:
|
||||||
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
|
return [ None if ele == FieldType.C_INT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_bigint_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C bigint row to python row
|
"""Function to convert C bigint row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1] ]
|
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)][::-1] ]
|
||||||
else:
|
else:
|
||||||
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
|
return [ None if ele == FieldType.C_BIGINT_NULL else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_longlong))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_float_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C float row to python row
|
"""Function to convert C float row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
|
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)][::-1] ]
|
||||||
else:
|
else:
|
||||||
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
|
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_double_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C double row to python row
|
"""Function to convert C double row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
|
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)][::-1] ]
|
||||||
else:
|
else:
|
||||||
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
|
return [ None if math.isnan(ele) else ele for ele in ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[:abs(num_of_rows)] ]
|
||||||
|
|
||||||
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_binary_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C binary row to python row
|
"""Function to convert C binary row to python row
|
||||||
"""
|
"""
|
||||||
if num_of_rows > 0:
|
if num_of_rows > 0:
|
||||||
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
|
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)][::-1]]
|
||||||
else:
|
else:
|
||||||
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
|
return [ None if ele.value[0:1] == FieldType.C_BINARY_NULL else ele.value.decode('utf-8') for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_char * nbytes)))[:abs(num_of_rows)]]
|
||||||
|
|
||||||
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
|
def _crow_nchar_to_python(data, num_of_rows, nbytes=None, micro=False):
|
||||||
"""Function to convert C nchar row to python row
|
"""Function to convert C nchar row to python row
|
||||||
"""
|
"""
|
||||||
assert(nbytes is not None)
|
assert(nbytes is not None)
|
||||||
|
|
||||||
res = []
|
res = []
|
||||||
|
|
||||||
for i in range(abs(num_of_rows)):
|
for i in range(abs(num_of_rows)):
|
||||||
try:
|
try:
|
||||||
if num_of_rows >= 0:
|
if num_of_rows >= 0:
|
||||||
res.append( (ctypes.cast(data+nbytes*(abs(num_of_rows - i -1)), ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
|
res.append( (ctypes.cast(data+nbytes*(abs(num_of_rows - i -1)), ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
|
||||||
else:
|
else:
|
||||||
res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
|
res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
|
||||||
except ValueError:
|
except ValueError:
|
||||||
res.append(None)
|
res.append(None)
|
||||||
|
|
||||||
return res
|
return res
|
||||||
# if num_of_rows > 0:
|
# if num_of_rows > 0:
|
||||||
# for i in range(abs(num_of_rows)):
|
# for i in range(abs(num_of_rows)):
|
||||||
# try:
|
# try:
|
||||||
# res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
|
# res.append( (ctypes.cast(data+nbytes*i, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[0].value )
|
||||||
# except ValueError:
|
# except ValueError:
|
||||||
# res.append(None)
|
# res.append(None)
|
||||||
# return res
|
# return res
|
||||||
# # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]]
|
# # return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)][::-1]]
|
||||||
# else:
|
# else:
|
||||||
# return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]]
|
# return [ele.value for ele in (ctypes.cast(data, ctypes.POINTER(ctypes.c_wchar * (nbytes//4))))[:abs(num_of_rows)]]
|
||||||
|
|
||||||
_CONVERT_FUNC = {
|
_CONVERT_FUNC = {
|
||||||
FieldType.C_BOOL: _crow_bool_to_python,
|
FieldType.C_BOOL: _crow_bool_to_python,
|
||||||
FieldType.C_TINYINT : _crow_tinyint_to_python,
|
FieldType.C_TINYINT : _crow_tinyint_to_python,
|
||||||
FieldType.C_SMALLINT : _crow_smallint_to_python,
|
FieldType.C_SMALLINT : _crow_smallint_to_python,
|
||||||
FieldType.C_INT : _crow_int_to_python,
|
FieldType.C_INT : _crow_int_to_python,
|
||||||
FieldType.C_BIGINT : _crow_bigint_to_python,
|
FieldType.C_BIGINT : _crow_bigint_to_python,
|
||||||
FieldType.C_FLOAT : _crow_float_to_python,
|
FieldType.C_FLOAT : _crow_float_to_python,
|
||||||
FieldType.C_DOUBLE : _crow_double_to_python,
|
FieldType.C_DOUBLE : _crow_double_to_python,
|
||||||
FieldType.C_BINARY: _crow_binary_to_python,
|
FieldType.C_BINARY: _crow_binary_to_python,
|
||||||
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
|
FieldType.C_TIMESTAMP : _crow_timestamp_to_python,
|
||||||
FieldType.C_NCHAR : _crow_nchar_to_python
|
FieldType.C_NCHAR : _crow_nchar_to_python
|
||||||
}
|
}
|
||||||
|
|
||||||
# Corresponding TAOS_FIELD structure in C
|
# Corresponding TAOS_FIELD structure in C
|
||||||
class TaosField(ctypes.Structure):
|
class TaosField(ctypes.Structure):
|
||||||
_fields_ = [('name', ctypes.c_char * 64),
|
_fields_ = [('name', ctypes.c_char * 64),
|
||||||
('bytes', ctypes.c_short),
|
('bytes', ctypes.c_short),
|
||||||
('type', ctypes.c_char)]
|
('type', ctypes.c_char)]
|
||||||
|
|
||||||
# C interface class
|
# C interface class
|
||||||
class CTaosInterface(object):
|
class CTaosInterface(object):
|
||||||
|
|
||||||
libtaos = ctypes.windll.LoadLibrary('taos')
|
libtaos = ctypes.windll.LoadLibrary('taos')
|
||||||
|
|
||||||
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
|
libtaos.taos_fetch_fields.restype = ctypes.POINTER(TaosField)
|
||||||
libtaos.taos_init.restype = None
|
libtaos.taos_init.restype = None
|
||||||
libtaos.taos_connect.restype = ctypes.c_void_p
|
libtaos.taos_connect.restype = ctypes.c_void_p
|
||||||
libtaos.taos_use_result.restype = ctypes.c_void_p
|
libtaos.taos_use_result.restype = ctypes.c_void_p
|
||||||
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
|
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
|
||||||
libtaos.taos_errstr.restype = ctypes.c_char_p
|
libtaos.taos_errstr.restype = ctypes.c_char_p
|
||||||
|
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
||||||
def __init__(self, config=None):
|
libtaos.taos_consume.restype = ctypes.c_void_p
|
||||||
'''
|
|
||||||
Function to initialize the class
|
def __init__(self, config=None):
|
||||||
@host : str, hostname to connect
|
'''
|
||||||
@user : str, username to connect to server
|
Function to initialize the class
|
||||||
@password : str, password to connect to server
|
@host : str, hostname to connect
|
||||||
@db : str, default db to use when log in
|
@user : str, username to connect to server
|
||||||
@config : str, config directory
|
@password : str, password to connect to server
|
||||||
|
@db : str, default db to use when log in
|
||||||
@rtype : None
|
@config : str, config directory
|
||||||
'''
|
|
||||||
if config is None:
|
@rtype : None
|
||||||
self._config = ctypes.c_char_p(None)
|
'''
|
||||||
else:
|
if config is None:
|
||||||
try:
|
self._config = ctypes.c_char_p(None)
|
||||||
self._config = ctypes.c_char_p(config.encode('utf-8'))
|
else:
|
||||||
except AttributeError:
|
try:
|
||||||
raise AttributeError("config is expected as a str")
|
self._config = ctypes.c_char_p(config.encode('utf-8'))
|
||||||
|
except AttributeError:
|
||||||
if config != None:
|
raise AttributeError("config is expected as a str")
|
||||||
CTaosInterface.libtaos.taos_options(3, self._config)
|
|
||||||
|
if config != None:
|
||||||
CTaosInterface.libtaos.taos_init()
|
CTaosInterface.libtaos.taos_options(3, self._config)
|
||||||
|
|
||||||
@property
|
CTaosInterface.libtaos.taos_init()
|
||||||
def config(self):
|
|
||||||
""" Get current config
|
@property
|
||||||
"""
|
def config(self):
|
||||||
return self._config
|
""" Get current config
|
||||||
|
"""
|
||||||
def connect(self, host=None, user="root", password="taosdata", db=None, port=0):
|
return self._config
|
||||||
'''
|
|
||||||
Function to connect to server
|
def connect(self, host=None, user="root", password="taosdata", db=None, port=0):
|
||||||
|
'''
|
||||||
@rtype: c_void_p, TDengine handle
|
Function to connect to server
|
||||||
'''
|
|
||||||
# host
|
@rtype: c_void_p, TDengine handle
|
||||||
try:
|
'''
|
||||||
_host = ctypes.c_char_p(host.encode(
|
# host
|
||||||
"utf-8")) if host != None else ctypes.c_char_p(None)
|
try:
|
||||||
except AttributeError:
|
_host = ctypes.c_char_p(host.encode(
|
||||||
raise AttributeError("host is expected as a str")
|
"utf-8")) if host != None else ctypes.c_char_p(None)
|
||||||
|
except AttributeError:
|
||||||
# user
|
raise AttributeError("host is expected as a str")
|
||||||
try:
|
|
||||||
_user = ctypes.c_char_p(user.encode("utf-8"))
|
# user
|
||||||
except AttributeError:
|
try:
|
||||||
raise AttributeError("user is expected as a str")
|
_user = ctypes.c_char_p(user.encode("utf-8"))
|
||||||
|
except AttributeError:
|
||||||
# password
|
raise AttributeError("user is expected as a str")
|
||||||
try:
|
|
||||||
_password = ctypes.c_char_p(password.encode("utf-8"))
|
# password
|
||||||
except AttributeError:
|
try:
|
||||||
raise AttributeError("password is expected as a str")
|
_password = ctypes.c_char_p(password.encode("utf-8"))
|
||||||
|
except AttributeError:
|
||||||
# db
|
raise AttributeError("password is expected as a str")
|
||||||
try:
|
|
||||||
_db = ctypes.c_char_p(
|
# db
|
||||||
db.encode("utf-8")) if db != None else ctypes.c_char_p(None)
|
try:
|
||||||
except AttributeError:
|
_db = ctypes.c_char_p(
|
||||||
raise AttributeError("db is expected as a str")
|
db.encode("utf-8")) if db != None else ctypes.c_char_p(None)
|
||||||
|
except AttributeError:
|
||||||
# port
|
raise AttributeError("db is expected as a str")
|
||||||
try:
|
|
||||||
_port = ctypes.c_int(port)
|
# port
|
||||||
except TypeError:
|
try:
|
||||||
raise TypeError("port is expected as an int")
|
_port = ctypes.c_int(port)
|
||||||
|
except TypeError:
|
||||||
connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect(
|
raise TypeError("port is expected as an int")
|
||||||
_host, _user, _password, _db, _port))
|
|
||||||
|
connection = ctypes.c_void_p(CTaosInterface.libtaos.taos_connect(
|
||||||
if connection.value == None:
|
_host, _user, _password, _db, _port))
|
||||||
print('connect to TDengine failed')
|
|
||||||
# sys.exit(1)
|
if connection.value == None:
|
||||||
else:
|
print('connect to TDengine failed')
|
||||||
print('connect to TDengine success')
|
# sys.exit(1)
|
||||||
|
else:
|
||||||
return connection
|
print('connect to TDengine success')
|
||||||
|
|
||||||
@staticmethod
|
return connection
|
||||||
def close(connection):
|
|
||||||
'''Close the TDengine handle
|
@staticmethod
|
||||||
'''
|
def close(connection):
|
||||||
CTaosInterface.libtaos.taos_close(connection)
|
'''Close the TDengine handle
|
||||||
print('connection is closed')
|
'''
|
||||||
|
CTaosInterface.libtaos.taos_close(connection)
|
||||||
@staticmethod
|
print('connection is closed')
|
||||||
def query(connection, sql):
|
|
||||||
'''Run SQL
|
@staticmethod
|
||||||
|
def query(connection, sql):
|
||||||
@sql: str, sql string to run
|
'''Run SQL
|
||||||
|
|
||||||
@rtype: 0 on success and -1 on failure
|
@sql: str, sql string to run
|
||||||
'''
|
|
||||||
try:
|
@rtype: 0 on success and -1 on failure
|
||||||
return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8')))
|
'''
|
||||||
except AttributeError:
|
try:
|
||||||
raise AttributeError("sql is expected as a string")
|
return CTaosInterface.libtaos.taos_query(connection, ctypes.c_char_p(sql.encode('utf-8')))
|
||||||
# finally:
|
except AttributeError:
|
||||||
# CTaosInterface.libtaos.close(connection)
|
raise AttributeError("sql is expected as a string")
|
||||||
|
# finally:
|
||||||
@staticmethod
|
# CTaosInterface.libtaos.close(connection)
|
||||||
def affectedRows(connection):
|
|
||||||
"""The affected rows after runing query
|
@staticmethod
|
||||||
"""
|
def affectedRows(connection):
|
||||||
return CTaosInterface.libtaos.taos_affected_rows(connection)
|
"""The affected rows after runing query
|
||||||
|
"""
|
||||||
@staticmethod
|
return CTaosInterface.libtaos.taos_affected_rows(connection)
|
||||||
def useResult(connection):
|
|
||||||
'''Use result after calling self.query
|
@staticmethod
|
||||||
'''
|
def subscribe(connection, restart, topic, sql, interval):
|
||||||
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
|
"""Create a subscription
|
||||||
fields = []
|
@restart boolean,
|
||||||
pfields = CTaosInterface.fetchFields(result)
|
@sql string, sql statement for data query, must be a 'select' statement.
|
||||||
for i in range(CTaosInterface.fieldsCount(connection)):
|
@topic string, name of this subscription
|
||||||
fields.append({'name': pfields[i].name.decode('utf-8'),
|
"""
|
||||||
'bytes': pfields[i].bytes,
|
return ctypes.c_void_p(CTaosInterface.libtaos.taos_subscribe(
|
||||||
'type': ord(pfields[i].type)})
|
connection,
|
||||||
|
1 if restart else 0,
|
||||||
return result, fields
|
ctypes.c_char_p(topic.encode('utf-8')),
|
||||||
|
ctypes.c_char_p(sql.encode('utf-8')),
|
||||||
@staticmethod
|
None,
|
||||||
def fetchBlock(result, fields):
|
None,
|
||||||
pblock = ctypes.c_void_p(0)
|
interval))
|
||||||
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
|
|
||||||
result, ctypes.byref(pblock))
|
@staticmethod
|
||||||
|
def consume(sub):
|
||||||
if num_of_rows == 0:
|
"""Consume data of a subscription
|
||||||
return None, 0
|
"""
|
||||||
|
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_consume(sub))
|
||||||
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
fields = []
|
||||||
blocks = [None] * len(fields)
|
pfields = CTaosInterface.fetchFields(result)
|
||||||
for i in range(len(fields)):
|
for i in range(CTaosInterface.libtaos.taos_num_fields(result)):
|
||||||
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
fields.append({'name': pfields[i].name.decode('utf-8'),
|
||||||
|
'bytes': pfields[i].bytes,
|
||||||
if fields[i]['type'] not in _CONVERT_FUNC:
|
'type': ord(pfields[i].type)})
|
||||||
raise DatabaseError("Invalid data type returned from database")
|
return result, fields
|
||||||
|
|
||||||
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
|
@staticmethod
|
||||||
|
def unsubscribe(sub, keepProgress):
|
||||||
return blocks, abs(num_of_rows)
|
"""Cancel a subscription
|
||||||
|
"""
|
||||||
@staticmethod
|
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
|
||||||
def freeResult(result):
|
|
||||||
CTaosInterface.libtaos.taos_free_result(result)
|
@staticmethod
|
||||||
result.value = None
|
def useResult(connection):
|
||||||
|
'''Use result after calling self.query
|
||||||
@staticmethod
|
'''
|
||||||
def fieldsCount(connection):
|
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_use_result(connection))
|
||||||
return CTaosInterface.libtaos.taos_field_count(connection)
|
fields = []
|
||||||
|
pfields = CTaosInterface.fetchFields(result)
|
||||||
@staticmethod
|
for i in range(CTaosInterface.fieldsCount(connection)):
|
||||||
def fetchFields(result):
|
fields.append({'name': pfields[i].name.decode('utf-8'),
|
||||||
return CTaosInterface.libtaos.taos_fetch_fields(result)
|
'bytes': pfields[i].bytes,
|
||||||
|
'type': ord(pfields[i].type)})
|
||||||
# @staticmethod
|
|
||||||
# def fetchRow(result, fields):
|
return result, fields
|
||||||
# l = []
|
|
||||||
# row = CTaosInterface.libtaos.taos_fetch_row(result)
|
@staticmethod
|
||||||
# if not row:
|
def fetchBlock(result, fields):
|
||||||
# return None
|
pblock = ctypes.c_void_p(0)
|
||||||
|
num_of_rows = CTaosInterface.libtaos.taos_fetch_block(
|
||||||
# for i in range(len(fields)):
|
result, ctypes.byref(pblock))
|
||||||
# l.append(CTaosInterface.getDataValue(
|
|
||||||
# row[i], fields[i]['type'], fields[i]['bytes']))
|
if num_of_rows == 0:
|
||||||
|
return None, 0
|
||||||
# return tuple(l)
|
|
||||||
|
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
||||||
# @staticmethod
|
blocks = [None] * len(fields)
|
||||||
# def getDataValue(data, dtype, byte):
|
for i in range(len(fields)):
|
||||||
# '''
|
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
||||||
# '''
|
|
||||||
# if not data:
|
if fields[i]['type'] not in _CONVERT_FUNC:
|
||||||
# return None
|
raise DatabaseError("Invalid data type returned from database")
|
||||||
|
|
||||||
# if (dtype == CTaosInterface.TSDB_DATA_TYPE_BOOL):
|
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[0]
|
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TINYINT):
|
return blocks, abs(num_of_rows)
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[0]
|
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_SMALLINT):
|
@staticmethod
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[0]
|
def freeResult(result):
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
|
CTaosInterface.libtaos.taos_free_result(result)
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
|
result.value = None
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
|
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
|
@staticmethod
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
|
def fieldsCount(connection):
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
|
return CTaosInterface.libtaos.taos_field_count(connection)
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
|
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[0]
|
@staticmethod
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
|
def fetchFields(result):
|
||||||
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
|
return CTaosInterface.libtaos.taos_fetch_fields(result)
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
|
|
||||||
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
|
# @staticmethod
|
||||||
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
|
# def fetchRow(result, fields):
|
||||||
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
|
# l = []
|
||||||
|
# row = CTaosInterface.libtaos.taos_fetch_row(result)
|
||||||
@staticmethod
|
# if not row:
|
||||||
def errno(connection):
|
# return None
|
||||||
"""Return the error number.
|
|
||||||
"""
|
# for i in range(len(fields)):
|
||||||
return CTaosInterface.libtaos.taos_errno(connection)
|
# l.append(CTaosInterface.getDataValue(
|
||||||
|
# row[i], fields[i]['type'], fields[i]['bytes']))
|
||||||
@staticmethod
|
|
||||||
def errStr(connection):
|
# return tuple(l)
|
||||||
"""Return the error styring
|
|
||||||
"""
|
# @staticmethod
|
||||||
return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8')
|
# def getDataValue(data, dtype, byte):
|
||||||
|
# '''
|
||||||
|
# '''
|
||||||
if __name__ == '__main__':
|
# if not data:
|
||||||
cinter = CTaosInterface()
|
# return None
|
||||||
conn = cinter.connect()
|
|
||||||
|
# if (dtype == CTaosInterface.TSDB_DATA_TYPE_BOOL):
|
||||||
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_bool))[0]
|
||||||
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TINYINT):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_byte))[0]
|
||||||
result, des = CTaosInterface.useResult(conn)
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_SMALLINT):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_short))[0]
|
||||||
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_INT):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_int))[0]
|
||||||
print(data)
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BIGINT):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
|
||||||
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_FLOAT):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_float))[0]
|
||||||
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_DOUBLE):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_double))[0]
|
||||||
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_BINARY):
|
||||||
|
# return (ctypes.cast(data, ctypes.POINTER(ctypes.c_char))[0:byte]).rstrip('\x00')
|
||||||
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_TIMESTAMP):
|
||||||
|
# return ctypes.cast(data, ctypes.POINTER(ctypes.c_long))[0]
|
||||||
|
# elif (dtype == CTaosInterface.TSDB_DATA_TYPE_NCHAR):
|
||||||
|
# return (ctypes.cast(data, ctypes.c_char_p).value).rstrip('\x00')
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def errno(connection):
|
||||||
|
"""Return the error number.
|
||||||
|
"""
|
||||||
|
return CTaosInterface.libtaos.taos_errno(connection)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def errStr(connection):
|
||||||
|
"""Return the error styring
|
||||||
|
"""
|
||||||
|
return CTaosInterface.libtaos.taos_errstr(connection).decode('utf-8')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
cinter = CTaosInterface()
|
||||||
|
conn = cinter.connect()
|
||||||
|
|
||||||
|
print('Query return value: {}'.format(cinter.query(conn, 'show databases')))
|
||||||
|
print('Affected rows: {}'.format(cinter.affectedRows(conn)))
|
||||||
|
|
||||||
|
result, des = CTaosInterface.useResult(conn)
|
||||||
|
|
||||||
|
data, num_of_rows = CTaosInterface.fetchBlock(result, des)
|
||||||
|
|
||||||
|
print(data)
|
||||||
|
|
||||||
cinter.close(conn)
|
cinter.close(conn)
|
|
@ -1,81 +1,89 @@
|
||||||
# from .cursor import TDengineCursor
|
from .cursor import TDengineCursor
|
||||||
from .cursor import TDengineCursor
|
from .subscription import TDengineSubscription
|
||||||
from .cinterface import CTaosInterface
|
from .cinterface import CTaosInterface
|
||||||
|
|
||||||
class TDengineConnection(object):
|
class TDengineConnection(object):
|
||||||
""" TDengine connection object
|
""" TDengine connection object
|
||||||
"""
|
"""
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self._conn = None
|
self._conn = None
|
||||||
self._host = None
|
self._host = None
|
||||||
self._user = "root"
|
self._user = "root"
|
||||||
self._password = "taosdata"
|
self._password = "taosdata"
|
||||||
self._database = None
|
self._database = None
|
||||||
self._port = 0
|
self._port = 0
|
||||||
self._config = None
|
self._config = None
|
||||||
self._chandle = None
|
self._chandle = None
|
||||||
|
|
||||||
if len(kwargs) > 0:
|
if len(kwargs) > 0:
|
||||||
self.config(**kwargs)
|
self.config(**kwargs)
|
||||||
|
|
||||||
def config(self, **kwargs):
|
def config(self, **kwargs):
|
||||||
# host
|
# host
|
||||||
if 'host' in kwargs:
|
if 'host' in kwargs:
|
||||||
self._host = kwargs['host']
|
self._host = kwargs['host']
|
||||||
|
|
||||||
# user
|
# user
|
||||||
if 'user' in kwargs:
|
if 'user' in kwargs:
|
||||||
self._user = kwargs['user']
|
self._user = kwargs['user']
|
||||||
|
|
||||||
# password
|
# password
|
||||||
if 'password' in kwargs:
|
if 'password' in kwargs:
|
||||||
self._password = kwargs['password']
|
self._password = kwargs['password']
|
||||||
|
|
||||||
# database
|
# database
|
||||||
if 'database' in kwargs:
|
if 'database' in kwargs:
|
||||||
self._database = kwargs['database']
|
self._database = kwargs['database']
|
||||||
|
|
||||||
# port
|
# port
|
||||||
if 'port' in kwargs:
|
if 'port' in kwargs:
|
||||||
self._port = kwargs['port']
|
self._port = kwargs['port']
|
||||||
|
|
||||||
# config
|
# config
|
||||||
if 'config' in kwargs:
|
if 'config' in kwargs:
|
||||||
self._config = kwargs['config']
|
self._config = kwargs['config']
|
||||||
|
|
||||||
self._chandle = CTaosInterface(self._config)
|
self._chandle = CTaosInterface(self._config)
|
||||||
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
|
self._conn = self._chandle.connect(self._host, self._user, self._password, self._database, self._port)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""Close current connection.
|
"""Close current connection.
|
||||||
"""
|
"""
|
||||||
return CTaosInterface.close(self._conn)
|
return CTaosInterface.close(self._conn)
|
||||||
|
|
||||||
def cursor(self):
|
def subscribe(self, restart, topic, sql, interval):
|
||||||
"""Return a new Cursor object using the connection.
|
"""Create a subscription.
|
||||||
"""
|
"""
|
||||||
return TDengineCursor(self)
|
if self._conn is None:
|
||||||
|
return None
|
||||||
def commit(self):
|
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval)
|
||||||
"""Commit any pending transaction to the database.
|
return TDengineSubscription(sub)
|
||||||
|
|
||||||
Since TDengine do not support transactions, the implement is void functionality.
|
def cursor(self):
|
||||||
"""
|
"""Return a new Cursor object using the connection.
|
||||||
pass
|
"""
|
||||||
|
return TDengineCursor(self)
|
||||||
def rollback(self):
|
|
||||||
"""Void functionality
|
def commit(self):
|
||||||
"""
|
"""Commit any pending transaction to the database.
|
||||||
pass
|
|
||||||
|
Since TDengine do not support transactions, the implement is void functionality.
|
||||||
def clear_result_set(self):
|
"""
|
||||||
"""Clear unused result set on this connection.
|
pass
|
||||||
"""
|
|
||||||
result = self._chandle.useResult(self._conn)[0]
|
def rollback(self):
|
||||||
if result:
|
"""Void functionality
|
||||||
self._chandle.freeResult(result)
|
"""
|
||||||
|
pass
|
||||||
if __name__ == "__main__":
|
|
||||||
conn = TDengineConnection(host='192.168.1.107')
|
def clear_result_set(self):
|
||||||
conn.close()
|
"""Clear unused result set on this connection.
|
||||||
|
"""
|
||||||
|
result = self._chandle.useResult(self._conn)[0]
|
||||||
|
if result:
|
||||||
|
self._chandle.freeResult(result)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
conn = TDengineConnection(host='192.168.1.107')
|
||||||
|
conn.close()
|
||||||
print("Hello world")
|
print("Hello world")
|
|
@ -0,0 +1,52 @@
|
||||||
|
from .cinterface import CTaosInterface
|
||||||
|
from .error import *
|
||||||
|
|
||||||
|
class TDengineSubscription(object):
|
||||||
|
"""TDengine subscription object
|
||||||
|
"""
|
||||||
|
def __init__(self, sub):
|
||||||
|
self._sub = sub
|
||||||
|
|
||||||
|
|
||||||
|
def consume(self):
|
||||||
|
"""Consume rows of a subscription
|
||||||
|
"""
|
||||||
|
if self._sub is None:
|
||||||
|
raise OperationalError("Invalid use of consume")
|
||||||
|
|
||||||
|
result, fields = CTaosInterface.consume(self._sub)
|
||||||
|
buffer = [[] for i in range(len(fields))]
|
||||||
|
print(buffer)
|
||||||
|
while True:
|
||||||
|
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
|
||||||
|
if num_of_fields == 0: break
|
||||||
|
for i in range(len(fields)):
|
||||||
|
buffer[i].extend(block[i])
|
||||||
|
|
||||||
|
return list(map(tuple, zip(*buffer)))
|
||||||
|
|
||||||
|
|
||||||
|
def close(self, keepProgress = True):
|
||||||
|
"""Close the Subscription.
|
||||||
|
"""
|
||||||
|
if self._sub is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
CTaosInterface.unsubscribe(self._sub, keepProgress)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
from .connection import TDengineConnection
|
||||||
|
conn = TDengineConnection(host="127.0.0.1", user="root", password="taosdata", database="test")
|
||||||
|
|
||||||
|
# Generate a cursor object to run SQL commands
|
||||||
|
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
|
||||||
|
|
||||||
|
for i in range(0,10):
|
||||||
|
data = sub.consume()
|
||||||
|
for d in data:
|
||||||
|
print(d)
|
||||||
|
|
||||||
|
sub.close()
|
||||||
|
conn.close()
|
Loading…
Reference in New Issue