58 lines
1.4 KiB
Python
58 lines
1.4 KiB
Python
from .cinterface import CTaosInterface
|
|
from .error import *
|
|
|
|
|
|
class TDengineSubscription(object):
|
|
"""TDengine subscription object
|
|
"""
|
|
|
|
def __init__(self, sub):
|
|
self._sub = sub
|
|
|
|
def consume(self):
|
|
"""Consume rows of a subscription
|
|
"""
|
|
if self._sub is None:
|
|
raise OperationalError("Invalid use of consume")
|
|
|
|
result, fields = CTaosInterface.consume(self._sub)
|
|
buffer = [[] for i in range(len(fields))]
|
|
while True:
|
|
block, num_of_fields = CTaosInterface.fetchBlock(result, fields)
|
|
if num_of_fields == 0:
|
|
break
|
|
for i in range(len(fields)):
|
|
buffer[i].extend(block[i])
|
|
|
|
self.fields = fields
|
|
return list(map(tuple, zip(*buffer)))
|
|
|
|
def close(self, keepProgress=True):
|
|
"""Close the Subscription.
|
|
"""
|
|
if self._sub is None:
|
|
return False
|
|
|
|
CTaosInterface.unsubscribe(self._sub, keepProgress)
|
|
return True
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from .connection import TDengineConnection
|
|
conn = TDengineConnection(
|
|
host="127.0.0.1",
|
|
user="root",
|
|
password="taosdata",
|
|
database="test")
|
|
|
|
# Generate a cursor object to run SQL commands
|
|
sub = conn.subscribe(True, "test", "select * from meters;", 1000)
|
|
|
|
for i in range(0, 10):
|
|
data = sub.consume()
|
|
for d in data:
|
|
print(d)
|
|
|
|
sub.close()
|
|
conn.close()
|