TBASE-1470: python3 & linux
This commit is contained in:
parent
bae48b73c6
commit
781c0ff6bc
|
@ -144,6 +144,8 @@ class CTaosInterface(object):
|
|||
libtaos.taos_use_result.restype = ctypes.c_void_p
|
||||
libtaos.taos_fetch_row.restype = ctypes.POINTER(ctypes.c_void_p)
|
||||
libtaos.taos_errstr.restype = ctypes.c_char_p
|
||||
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
||||
libtaos.taos_consume.restype = ctypes.c_void_p
|
||||
|
||||
def __init__(self, config=None):
|
||||
'''
|
||||
|
@ -252,6 +254,41 @@ class CTaosInterface(object):
|
|||
"""
|
||||
return CTaosInterface.libtaos.taos_affected_rows(connection)
|
||||
|
||||
@staticmethod
|
||||
def subscribe(connection, restart, topic, sql, interval):
|
||||
"""Create a subscription
|
||||
@restart boolean,
|
||||
@sql string, sql statement for data query, must be a 'select' statement.
|
||||
@topic string, name of this subscription
|
||||
"""
|
||||
return ctypes.c_void_p(CTaosInterface.libtaos.taos_subscribe(
|
||||
connection,
|
||||
1 if restart else 0,
|
||||
ctypes.c_char_p(topic.encode('utf-8')),
|
||||
ctypes.c_char_p(sql.encode('utf-8')),
|
||||
None,
|
||||
None,
|
||||
interval))
|
||||
|
||||
@staticmethod
|
||||
def consume(sub):
|
||||
"""Consume data of a subscription
|
||||
"""
|
||||
result = ctypes.c_void_p(CTaosInterface.libtaos.taos_consume(sub))
|
||||
fields = []
|
||||
pfields = CTaosInterface.fetchFields(result)
|
||||
for i in range(CTaosInterface.libtaos.taos_num_fields(result)):
|
||||
fields.append({'name': pfields[i].name.decode('utf-8'),
|
||||
'bytes': pfields[i].bytes,
|
||||
'type': ord(pfields[i].type)})
|
||||
return result, fields
|
||||
|
||||
@staticmethod
|
||||
def unsubscribe(sub, keepProgress):
|
||||
"""Cancel a subscription
|
||||
"""
|
||||
CTaosInterface.libtaos.taos_unsubscribe(sub, 1 if keepProgress else 0)
|
||||
|
||||
@staticmethod
|
||||
def useResult(connection):
|
||||
'''Use result after calling self.query
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# from .cursor import TDengineCursor
|
||||
from .cursor import TDengineCursor
|
||||
from .subscription import TDengineSubscription
|
||||
from .cinterface import CTaosInterface
|
||||
|
||||
class TDengineConnection(object):
|
||||
|
@ -50,6 +50,14 @@ class TDengineConnection(object):
|
|||
"""
|
||||
return CTaosInterface.close(self._conn)
|
||||
|
||||
def subscribe(self, restart, topic, sql, interval):
|
||||
"""Create a subscription.
|
||||
"""
|
||||
if self._conn is None:
|
||||
return None
|
||||
sub = CTaosInterface.subscribe(self._conn, restart, topic, sql, interval)
|
||||
return TDengineSubscription(sub)
|
||||
|
||||
def cursor(self):
|
||||
"""Return a new Cursor object using the connection.
|
||||
"""
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
from .cinterface import CTaosInterface
|
||||
from .error import *
|
||||
|
||||
class TDengineSubscription(object):
|
||||
"""TDengine subscription object
|
||||
"""
|
||||
def __init__(self, sub):
|
||||
self._sub = sub
|
||||
|
||||
|
||||
def consume(self):
|
||||
"""Consume rows of a subscription
|
||||
"""
|
||||
if self._sub is None:
|
||||
raise OperationalError("Invalid use of consume")
|
||||
|
||||
result, fields = CTaosInterface.consume(self._sub)
|
||||
buffer = [[] for i in range(len(fields))]
|
||||
print(buffer)
|
||||
while True:
|
||||
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
|
||||
if num_of_fields == 0: break
|
||||
for i in range(len(fields)):
|
||||
buffer[i].extend(block[i])
|
||||
|
||||
return list(map(tuple, zip(*buffer)))
|
||||
|
||||
|
||||
def close(self, keepProgress = True):
|
||||
"""Close the Subscription.
|
||||
"""
|
||||
if self._sub is None:
|
||||
return False
|
||||
|
||||
CTaosInterface.unsubscribe(self._sub, keepProgress)
|
||||
return True
|
Loading…
Reference in New Issue