docs: add tmq sample code
This commit is contained in:
parent
f4342a8b73
commit
bf932e1d77
|
@ -670,55 +670,126 @@ public class SchemalessInsertTest {
|
||||||
|
|
||||||
TDengine Java 连接器支持订阅功能,应用 API 如下:
|
TDengine Java 连接器支持订阅功能,应用 API 如下:
|
||||||
|
|
||||||
#### 创建订阅
|
#### 创建 Topic
|
||||||
|
|
||||||
```java
|
```java
|
||||||
TSDBSubscribe sub = ((TSDBConnection)conn).subscribe("topic", "select * from meters", false);
|
Connection connection = DriverManager.getConnection(url, properties);
|
||||||
|
Statement statement = connection.createStatement();
|
||||||
|
statement.executeUpdate("create topic if not exists topic_speed as select ts, speed from speed_table");
|
||||||
```
|
```
|
||||||
|
|
||||||
`subscribe` 方法的三个参数含义如下:
|
`subscribe` 方法的三个参数含义如下:
|
||||||
|
|
||||||
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
|
- topic_speed:订阅的主题(即名称),此参数是订阅的唯一标识。
|
||||||
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
|
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据。
|
||||||
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
|
|
||||||
|
|
||||||
如上面的例子将使用 SQL 语句 `select * from meters` 创建一个名为 `topic` 的订阅,如果这个订阅已经存在,将继续之前的查询进度,而不是从头开始消费所有的数据。
|
如上面的例子将使用 SQL 语句 `select ts, speed from speed_table` 创建一个名为 `topic_speed` 的订阅。
|
||||||
|
|
||||||
|
#### 创建 Consumer
|
||||||
|
|
||||||
|
```java
|
||||||
|
Properties config = new Properties();
|
||||||
|
config.setProperty("enable.auto.commit", "true");
|
||||||
|
config.setProperty("group.id", "group1");
|
||||||
|
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
|
||||||
|
|
||||||
|
TaosConsumer consumer = new TaosConsumer<>(config);
|
||||||
|
```
|
||||||
|
|
||||||
|
- enable.auto.commit: 是否允许自动提交。
|
||||||
|
- group.id: consumer: 所在的 group。
|
||||||
|
- value.deserializer: 结果集反序列化方法,可以继承 `com.taosdata.jdbc.tmq.ReferenceDeserializer`,并指定结果集 bean,实现反序列化。也可以继承 `com.taosdata.jdbc.tmq.Deserializer`,根据 SQL 的 resultSet 自定义反序列化方式。
|
||||||
|
|
||||||
#### 订阅消费数据
|
#### 订阅消费数据
|
||||||
|
|
||||||
```java
|
```java
|
||||||
int total = 0;
|
|
||||||
while(true) {
|
while(true) {
|
||||||
TSDBResultSet rs = sub.consume();
|
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||||
int count = 0;
|
for (ResultBean record : records) {
|
||||||
while(rs.next()) {
|
process(record);
|
||||||
count++;
|
}
|
||||||
}
|
|
||||||
total += count;
|
|
||||||
System.out.printf("%d rows consumed, total %d\n", count, total);
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
`consume` 方法返回一个结果集,其中包含从上次 `consume` 到目前为止的所有新数据。请务必按需选择合理的调用 `consume` 的频率(如例子中的 `Thread.sleep(1000)`),否则会给服务端造成不必要的压力。
|
`poll` 方法返回一个结果集,其中包含从上次 `poll` 到目前为止的所有新数据。请务必按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。
|
||||||
|
|
||||||
#### 关闭订阅
|
#### 关闭订阅
|
||||||
|
|
||||||
```java
|
```java
|
||||||
sub.close(true);
|
consumer.close()
|
||||||
```
|
```
|
||||||
|
|
||||||
`close` 方法关闭一个订阅。如果其参数为 `true` 表示保留订阅进度信息,后续可以创建同名订阅继续消费数据;如为 `false` 则不保留订阅进度。
|
### 使用示例如下:
|
||||||
|
|
||||||
### 关闭资源
|
|
||||||
|
|
||||||
```java
|
```java
|
||||||
resultSet.close();
|
public abstract class ConsumerLoop {
|
||||||
stmt.close();
|
private final TaosConsumer<ResultBean> consumer;
|
||||||
conn.close();
|
private final List<String> topics;
|
||||||
```
|
private final AtomicBoolean shutdown;
|
||||||
|
private final CountDownLatch shutdownLatch;
|
||||||
|
|
||||||
> `注意务必要将 connection 进行关闭`,否则会出现连接泄露。
|
public ConsumerLoop() throws SQLException {
|
||||||
|
Properties config = new Properties();
|
||||||
|
config.setProperty("msg.with.table.name", "true");
|
||||||
|
config.setProperty("enable.auto.commit", "true");
|
||||||
|
config.setProperty("group.id", "group1");
|
||||||
|
config.setProperty("value.deserializer", "com.taosdata.jdbc.tmq.ConsumerTest.ResultDeserializer");
|
||||||
|
|
||||||
|
this.consumer = new TaosConsumer<>(config);
|
||||||
|
this.topics = Collections.singletonList("topic_speed");
|
||||||
|
this.shutdown = new AtomicBoolean(false);
|
||||||
|
this.shutdownLatch = new CountDownLatch(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void process(ResultBean result);
|
||||||
|
|
||||||
|
public void pollData() throws SQLException {
|
||||||
|
try {
|
||||||
|
consumer.subscribe(topics);
|
||||||
|
|
||||||
|
while (!shutdown.get()) {
|
||||||
|
ConsumerRecords<ResultBean> records = consumer.poll(Duration.ofMillis(100));
|
||||||
|
for (ResultBean record : records) {
|
||||||
|
process(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
consumer.close();
|
||||||
|
shutdownLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() throws InterruptedException {
|
||||||
|
shutdown.set(true);
|
||||||
|
shutdownLatch.await();
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ResultDeserializer extends ReferenceDeserializer<ResultBean> {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static class ResultBean {
|
||||||
|
private Timestamp ts;
|
||||||
|
private int speed;
|
||||||
|
|
||||||
|
public Timestamp getTs() {
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTs(Timestamp ts) {
|
||||||
|
this.ts = ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSpeed() {
|
||||||
|
return speed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setSpeed(int speed) {
|
||||||
|
this.speed = speed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### 与连接池使用
|
### 与连接池使用
|
||||||
|
|
||||||
|
|
|
@ -757,7 +757,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_prepareStmtImp(J
|
||||||
int32_t code = taos_stmt_prepare(pStmt, str, len);
|
int32_t code = taos_stmt_prepare(pStmt, str, len);
|
||||||
taosMemoryFreeClear(str);
|
taosMemoryFreeClear(str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
jniError("prepareStmt jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -785,7 +785,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
(*env)->ReleaseStringUTFChars(env, jname, name);
|
(*env)->ReleaseStringUTFChars(env, jname, name);
|
||||||
|
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
|
jniError("bindTableName jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -860,7 +860,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsI
|
||||||
(*env)->ReleaseStringUTFChars(env, tableName, name);
|
(*env)->ReleaseStringUTFChars(env, tableName, name);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
|
jniError("tableNameTags jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
return JNI_SUCCESS;
|
return JNI_SUCCESS;
|
||||||
|
@ -926,7 +926,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(
|
||||||
taosMemoryFreeClear(b);
|
taosMemoryFreeClear(b);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
jniError("bindColData jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -949,7 +949,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_addBatchImp(JNIEn
|
||||||
|
|
||||||
int32_t code = taos_stmt_add_batch(pStmt);
|
int32_t code = taos_stmt_add_batch(pStmt);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
jniError("add batch jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -973,7 +973,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
|
||||||
|
|
||||||
int32_t code = taos_stmt_execute(pStmt);
|
int32_t code = taos_stmt_execute(pStmt);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
jniError("excute batch jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -997,7 +997,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
|
||||||
|
|
||||||
int32_t code = taos_stmt_close(pStmt);
|
int32_t code = taos_stmt_close(pStmt);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
jniError("jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
jniError("close stmt jobj:%p, conn:%p, code:%s", jobj, tscon, tstrerror(code));
|
||||||
return JNI_TDENGINE_ERROR;
|
return JNI_TDENGINE_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue