Update 07-tmq.mdx
This commit is contained in:
parent
8d36afe6df
commit
dbf93120ab
|
@ -92,102 +92,21 @@ void close() throws SQLException;
|
|||
<TabItem value="Python" label="Python">
|
||||
```python
|
||||
class TaosConsumer():
|
||||
DEFAULT_CONFIG = {
|
||||
'group.id',
|
||||
'client.id',
|
||||
'enable.auto.commit',
|
||||
'auto.commit.interval.ms',
|
||||
'auto.offset.reset',
|
||||
'msg.with.table.name',
|
||||
'experimental.snapshot.enable',
|
||||
'enable.heartbeat.background',
|
||||
'experimental.snapshot.batch.size',
|
||||
'td.connect.ip',
|
||||
'td.connect.user',
|
||||
'td.connect.pass',
|
||||
'td.connect.port',
|
||||
'td.connect.db',
|
||||
'timeout'
|
||||
}
|
||||
def __init__(self, *topics, **configs)
|
||||
|
||||
def __init__(self, *topics, **configs):
|
||||
self._closed = True
|
||||
self._conf = None
|
||||
self._list = None
|
||||
self._tmq = None
|
||||
def __iter__(self)
|
||||
|
||||
keys = list(configs.keys())
|
||||
for k in keys:
|
||||
configs.update({k.replace('_','.'): configs.pop(k)})
|
||||
def __next__(self)
|
||||
|
||||
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
|
||||
if extra_configs:
|
||||
raise TmqError("Unrecognized configs: %s" % (extra_configs,))
|
||||
def sync_next(self)
|
||||
|
||||
self._conf = tmq_conf_new()
|
||||
self._list = tmq_list_new()
|
||||
def subscription(self)
|
||||
|
||||
# set poll timeout
|
||||
if 'timeout' in configs:
|
||||
self._timeout = configs['timeout']
|
||||
del configs['timeout']
|
||||
else:
|
||||
self._timeout = 0
|
||||
def unsubscribe(self)
|
||||
|
||||
# check if group id is set
|
||||
def close(self)
|
||||
|
||||
if 'group.id' not in configs:
|
||||
raise TmqError("missing group.id in consumer config setting")
|
||||
|
||||
for key, value in configs.items():
|
||||
tmq_conf_set(self._conf, key, value)
|
||||
|
||||
self._tmq = tmq_consumer_new(self._conf)
|
||||
|
||||
if not topics:
|
||||
raise TmqError("Unset topic for Consumer")
|
||||
|
||||
for topic in topics:
|
||||
tmq_list_append(self._list, topic)
|
||||
|
||||
tmq_subscribe(self._tmq, self._list)
|
||||
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
if not self._tmq:
|
||||
raise StopIteration('TaosConsumer closed')
|
||||
return next(self.sync_next())
|
||||
|
||||
def sync_next(self):
|
||||
while 1:
|
||||
res = tmq_consumer_poll(self._tmq, self._timeout)
|
||||
if res:
|
||||
break
|
||||
yield TaosResult(res)
|
||||
|
||||
def subscription(self):
|
||||
if self._tmq is None:
|
||||
return None
|
||||
return tmq_subscription(self._tmq)
|
||||
|
||||
def unsubscribe(self):
|
||||
tmq_unsubscribe(self._tmq)
|
||||
|
||||
def close(self):
|
||||
if self._tmq:
|
||||
tmq_consumer_close(self._tmq)
|
||||
self._tmq = None
|
||||
|
||||
def __del__(self):
|
||||
if self._conf:
|
||||
tmq_conf_destroy(self._conf)
|
||||
if self._list:
|
||||
tmq_list_destroy(self._list)
|
||||
if self._tmq:
|
||||
tmq_consumer_close(self._tmq)
|
||||
def __del__(self)
|
||||
```
|
||||
</TabItem>
|
||||
<TabItem label="Go" value="Go">
|
||||
|
@ -354,6 +273,8 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
|
||||
<TabItem value="Python" label="Python">
|
||||
|
||||
Python 使用以下配置项创建一个 Consumer 实例。
|
||||
|
||||
| 参数名称 | 类型 | 参数说明 | 备注 |
|
||||
| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- |
|
||||
| `td_connect_ip` | string | 用于创建连接,同 `taos_connect` | |
|
||||
|
@ -368,6 +289,7 @@ public class MetersDeserializer extends ReferenceDeserializer<Meters> {
|
|||
| `enable_heartbeat_background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
||||
| `experimental_snapshot_enable` | string | 从 WAL 开始消费,还是从 TSBS 开始消费 | 合法值:`true`, `false` |
|
||||
| `msg_with_table_name` | string | 是否允许从消息中解析表名 | 合法值:`true`, `false` |
|
||||
| `timeout` | int | 消费者拉去的超时时间 | |
|
||||
|
||||
</TabItem>
|
||||
|
||||
|
|
Loading…
Reference in New Issue