Merge branch 'docs/wade-3.0-shibin' into docs/wade-3.0
This commit is contained in:
commit
52fab24a89
|
@ -13,12 +13,18 @@ int main() {
|
|||
uint16_t port = 0; // 0 means use the default port
|
||||
TAOS *taos = taos_connect(host, user, passwd, db, port);
|
||||
if (taos == NULL) {
|
||||
int errno = taos_errno(NULL);
|
||||
char *msg = taos_errstr(NULL);
|
||||
int errno = taos_errno(NULL);
|
||||
const char *msg = taos_errstr(NULL);
|
||||
printf("%d, %s\n", errno, msg);
|
||||
} else {
|
||||
printf("connected\n");
|
||||
taos_close(taos);
|
||||
printf("failed to connect to server %s, errno: %d, msg: %s\n", host, errno, msg);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
printf("success to connect server %s\n", host);
|
||||
|
||||
/* put your code here for read and write */
|
||||
|
||||
// close & clean
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
}
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o CCreateDBDemo CCreateDBDemo.c -ltaos
|
||||
// to compile: gcc -o create_db_demo create_db_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
|
@ -32,22 +32,23 @@ const char *password = "taosdata";
|
|||
// connect
|
||||
TAOS *taos = taos_connect(ip, user, password, NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason: %s\n", taos_errstr(NULL));
|
||||
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL));
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
printf("success to connect server %s\n", ip);
|
||||
|
||||
// create database
|
||||
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create database, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to create database power, reason: %s\n", taos_errstr(result));
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(result);
|
||||
printf("success to create database\n");
|
||||
printf("success to create database power\n");
|
||||
|
||||
// use database
|
||||
result = taos_query(taos, "USE power");
|
||||
|
@ -58,13 +59,13 @@ const char* sql = "CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLO
|
|||
result = taos_query(taos, sql);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create table, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to create stable meters, reason: %s\n", taos_errstr(result));
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(result);
|
||||
printf("success to create table\n");
|
||||
printf("success to create table meters\n");
|
||||
|
||||
// close & clean
|
||||
taos_close(taos);
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o CInsertDataDemo CInsertDataDemo.c -ltaos
|
||||
// to compile: gcc -o insert_data_demo insert_data_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
|
@ -31,10 +31,11 @@ const char *password = "taosdata";
|
|||
// connect
|
||||
TAOS *taos = taos_connect(ip, user, password, NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason: %s\n", taos_errstr(NULL));
|
||||
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL));
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
printf("success to connect server %s\n", ip);
|
||||
|
||||
// use database
|
||||
TAOS_RES *result = taos_query(taos, "USE power");
|
||||
|
@ -53,7 +54,7 @@ const char* sql = "INSERT INTO "
|
|||
result = taos_query(taos, sql);
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to insert rows, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to insert data to power.meters, ip: %s, reason: %s\n", ip, taos_errstr(result));
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
|
@ -62,7 +63,7 @@ taos_free_result(result);
|
|||
|
||||
// you can check affectedRows here
|
||||
int rows = taos_affected_rows(result);
|
||||
printf("success to insert %d rows\n", rows);
|
||||
printf("success to insert %d rows data to power.meters\n", rows);
|
||||
|
||||
// close & clean
|
||||
taos_close(taos);
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o CQueryDataDemo CQueryDataDemo.c -ltaos
|
||||
// to compile: gcc -o query_data_demo query_data_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
|
@ -32,21 +32,22 @@ const char *password = "taosdata";
|
|||
// connect
|
||||
TAOS *taos = taos_connect(ip, user, password, NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason: %s\n", taos_errstr(NULL));
|
||||
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL));
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
printf("success to connect server %s\n", ip);
|
||||
|
||||
// use database
|
||||
TAOS_RES *result = taos_query(taos, "USE power");
|
||||
taos_free_result(result);
|
||||
|
||||
// query data, please make sure the database and table are already created
|
||||
const char* sql = "SELECT * FROM power.meters";
|
||||
const char* sql = "SELECT ts, current, location FROM power.meters limit 100";
|
||||
result = taos_query(taos, sql);
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to select, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to query data from power.meters, ip: %s, reason: %s\n", ip, taos_errstr(result));
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
|
@ -69,6 +70,7 @@ while ((row = taos_fetch_row(result))) {
|
|||
}
|
||||
printf("total rows: %d\n", rows);
|
||||
taos_free_result(result);
|
||||
printf("success to query data from power.meters\n");
|
||||
|
||||
// close & clean
|
||||
taos_close(taos);
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o CSmlInsertDemo CSmlInsertDemo.c -ltaos
|
||||
// to compile: gcc -o sml_insert_demo sml_insert_demo.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
|
@ -31,22 +31,23 @@ const char *password = "taosdata";
|
|||
// connect
|
||||
TAOS *taos = taos_connect(ip, user, password, NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason: %s\n", taos_errstr(NULL));
|
||||
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL));
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
printf("success to connect server %s\n", ip);
|
||||
|
||||
// create database
|
||||
TAOS_RES *result = taos_query(taos, "CREATE DATABASE IF NOT EXISTS power");
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create database, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to create database power, reason: %s\n", taos_errstr(result));
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(result);
|
||||
printf("success to create database\n");
|
||||
printf("success to create database power\n");
|
||||
|
||||
// use database
|
||||
result = taos_query(taos, "USE power");
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o CStmtInsertDemo CStmtInsertDemo.c -ltaos
|
||||
// to compile: gcc -o stmt_insert_demo stmt_insert_demo.c -ltaos
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
|
@ -1,333 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
const char* topic_name = "topicname";
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024];
|
||||
int32_t rows = 0;
|
||||
|
||||
const char* topicName = tmq_get_topic_name(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
// int32_t* length = taos_fetch_lengths(msg);
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
rows++;
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
printf("precision: %d, row content: %s\n", precision, buf);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
|
||||
static int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes;
|
||||
// drop database if exists
|
||||
printf("create database\n");
|
||||
pRes = taos_query(pConn, "drop topic topicname");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop topicname, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create database
|
||||
pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create super table
|
||||
printf("create super table\n");
|
||||
pRes = taos_query(
|
||||
pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create sub tables
|
||||
printf("create sub tables\n");
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb0 using tmqdb.stb tags(0, 'subtable0')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb1 using tmqdb.stb tags(1, 'subtable1')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb1, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb2 using tmqdb.stb tags(2, 'subtable2')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb2, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tmqdb.ctb3 using tmqdb.stb tags(3, 'subtable3')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table ctb3, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// insert data
|
||||
printf("insert data into sub tables\n");
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb0 values(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb1 values(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb2 values(now, 2, 2, 'a1')(now+1s, 22, 22, 'a22')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "insert into tmqdb.ctb3 values(now, 3, 3, 'a1')(now+1s, 33, 33, 'a33')");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to insert into ctb0, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
|
||||
END:
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use tmqdb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3, tbname from tmqdb.stb where c1 > 1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_res_t code;
|
||||
tmq_t* tmq = NULL;
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "group.id", "cgrpName");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "client.id", "user defined name");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
_end:
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
int32_t code = tmq_list_append(topicList, topic_name);
|
||||
if (code) {
|
||||
tmq_list_destroy(topicList);
|
||||
return NULL;
|
||||
}
|
||||
return topicList;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t* tmq) {
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 5000;
|
||||
while (running) {
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
totalRows += msg_process(tmqmsg);
|
||||
taos_free_result(tmqmsg);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
void consume_repeatly(tmq_t* tmq) {
|
||||
int32_t numOfAssignment = 0;
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
|
||||
}
|
||||
|
||||
// seek to the earliest offset
|
||||
for(int32_t i = 0; i < numOfAssignment; ++i) {
|
||||
tmq_topic_assignment* p = &pAssign[i];
|
||||
|
||||
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
|
||||
}
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
// let's do it again
|
||||
basic_consume_loop(tmq);
|
||||
}
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int32_t code;
|
||||
|
||||
if (init_env() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_topic() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_t* tmq = build_consumer();
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "build_consumer() fail!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if (NULL == topic_list) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((code = tmq_subscribe(tmq, topic_list))) {
|
||||
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
||||
}
|
||||
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
basic_consume_loop(tmq);
|
||||
|
||||
consume_repeatly(tmq);
|
||||
|
||||
code = tmq_consumer_close(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
|
||||
} else {
|
||||
fprintf(stderr, "Consumer closed\n");
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,412 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
#include "taos.h"
|
||||
|
||||
volatile int thread_stop = 0;
|
||||
static int running = 1;
|
||||
const char* topic_name = "topic_meters";
|
||||
|
||||
void* prepare_data(void* arg) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes;
|
||||
int i = 1;
|
||||
|
||||
while (!thread_stop) {
|
||||
char buf[200] = {0};
|
||||
i++;
|
||||
snprintf(
|
||||
buf, sizeof(buf),
|
||||
"INSERT INTO power.d1001 USING power.meters TAGS(2,'California.SanFrancisco') VALUES (NOW + %da, 10.30000, "
|
||||
"219, 0.31000)",
|
||||
i);
|
||||
|
||||
pRes = taos_query(pConn, buf);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in insert data to power.meters, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
sleep(1);
|
||||
}
|
||||
printf("prepare data thread exit\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// ANCHOR: msg_process
|
||||
static int32_t msg_process(TAOS_RES* msg) {
|
||||
char buf[1024]; // buf to store the row content
|
||||
int32_t rows = 0;
|
||||
const char* topicName = tmq_get_topic_name(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
printf("topic: %s\n", topicName);
|
||||
printf("db: %s\n", dbName);
|
||||
printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
while (1) {
|
||||
// get one row data from message
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
|
||||
// get the field information
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
// get the number of fields
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
// get the precision of the result
|
||||
int32_t precision = taos_result_precision(msg);
|
||||
rows++;
|
||||
// print the row content
|
||||
if (taos_print_row(buf, row, fields, numOfFields) < 0) {
|
||||
printf("failed to print row\n");
|
||||
break;
|
||||
}
|
||||
// print the precision and row content to the console
|
||||
printf("precision: %d, row content: %s\n", precision, buf);
|
||||
}
|
||||
|
||||
return rows;
|
||||
}
|
||||
// ANCHOR_END: msg_process
|
||||
|
||||
static int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes;
|
||||
// drop database if exists
|
||||
printf("create database\n");
|
||||
pRes = taos_query(pConn, "drop topic if exists topic_meters");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop topic_meters, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "drop database if exists power");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in drop power, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create database
|
||||
pRes = taos_query(pConn, "create database power precision 'ms' WAL_RETENTION_PERIOD 3600");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create super table
|
||||
printf("create super table\n");
|
||||
pRes = taos_query(
|
||||
pConn,
|
||||
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
|
||||
"(groupId INT, location BINARY(24))");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
|
||||
goto END;
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
|
||||
END:
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t create_topic() {
|
||||
printf("create topic\n");
|
||||
TAOS_RES* pRes;
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRes = taos_query(pConn, "use power");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(
|
||||
pConn,
|
||||
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_meters, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
|
||||
}
|
||||
|
||||
// ANCHOR: create_consumer_1
|
||||
tmq_t* build_consumer() {
|
||||
tmq_conf_res_t code;
|
||||
tmq_t* tmq = NULL;
|
||||
|
||||
// create a configuration object
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
// set the configuration parameters
|
||||
code = tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "group.id", "group1");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "client.id", "client1");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.user", "root");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
code = tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
if (TMQ_CONF_OK != code) {
|
||||
tmq_conf_destroy(conf);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// set the callback function for auto commit
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
// create a consumer object
|
||||
tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
_end:
|
||||
// destroy the configuration object
|
||||
tmq_conf_destroy(conf);
|
||||
return tmq;
|
||||
}
|
||||
// ANCHOR_END: create_consumer_1
|
||||
|
||||
// ANCHOR: build_topic_list
|
||||
// build a topic list used to subscribe
|
||||
tmq_list_t* build_topic_list() {
|
||||
// create a empty topic list
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
const char* topic_name = "topic_meters";
|
||||
|
||||
// append topic name to the list
|
||||
int32_t code = tmq_list_append(topicList, topic_name);
|
||||
if (code) {
|
||||
// if failed, destroy the list and return NULL
|
||||
tmq_list_destroy(topicList);
|
||||
return NULL;
|
||||
}
|
||||
// if success, return the list
|
||||
return topicList;
|
||||
}
|
||||
// ANCHOR_END: build_topic_list
|
||||
|
||||
// ANCHOR: basic_consume_loop
|
||||
void basic_consume_loop(tmq_t* tmq) {
|
||||
int32_t totalRows = 0; // total rows consumed
|
||||
int32_t msgCnt = 0; // total messages consumed
|
||||
int32_t timeout = 5000; // poll timeout
|
||||
|
||||
while (running) {
|
||||
// poll message from TDengine
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
// process the message
|
||||
totalRows += msg_process(tmqmsg);
|
||||
// free the message
|
||||
taos_free_result(tmqmsg);
|
||||
}
|
||||
if (msgCnt > 10) {
|
||||
// consume 10 messages and break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// print the result: total messages and total rows consumed
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
// ANCHOR_END: basic_consume_loop
|
||||
|
||||
// ANCHOR: consume_repeatly
|
||||
void consume_repeatly(tmq_t* tmq) {
|
||||
int32_t numOfAssignment = 0;
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
|
||||
// get the topic assignment
|
||||
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
|
||||
if (code != 0 || pAssign == NULL || numOfAssignment == 0) {
|
||||
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
|
||||
return;
|
||||
}
|
||||
|
||||
// seek to the earliest offset
|
||||
for (int32_t i = 0; i < numOfAssignment; ++i) {
|
||||
tmq_topic_assignment* p = &pAssign[i];
|
||||
|
||||
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
|
||||
}
|
||||
}
|
||||
|
||||
// free the assignment array
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
// let's consume the messages again
|
||||
basic_consume_loop(tmq);
|
||||
}
|
||||
// ANCHOR_END: consume_repeatly
|
||||
|
||||
// ANCHOR: manual_commit
|
||||
void manual_commit(tmq_t* tmq) {
|
||||
int32_t totalRows = 0; // total rows consumed
|
||||
int32_t msgCnt = 0; // total messages consumed
|
||||
int32_t timeout = 5000; // poll timeout
|
||||
|
||||
while (running) {
|
||||
// poll message from TDengine
|
||||
TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
|
||||
if (tmqmsg) {
|
||||
msgCnt++;
|
||||
// process the message
|
||||
totalRows += msg_process(tmqmsg);
|
||||
// commit the message
|
||||
int32_t code = tmq_commit_sync(tmq, tmqmsg);
|
||||
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to commit message: %s\n", tmq_err2str(code));
|
||||
// free the message
|
||||
taos_free_result(tmqmsg);
|
||||
break;
|
||||
}
|
||||
// free the message
|
||||
taos_free_result(tmqmsg);
|
||||
}
|
||||
if (msgCnt > 10) {
|
||||
// consume 10 messages and break
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// print the result: total messages and total rows consumed
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
// ANCHOR_END: manual_commit
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
int32_t code;
|
||||
pthread_t thread_id;
|
||||
|
||||
if (init_env() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (create_topic() < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pthread_create(&thread_id, NULL, &prepare_data, NULL)) {
|
||||
fprintf(stderr, "create thread failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
// ANCHOR: create_consumer_2
|
||||
tmq_t* tmq = build_consumer();
|
||||
if (NULL == tmq) {
|
||||
fprintf(stderr, "build consumer to localhost fail!\n");
|
||||
return -1;
|
||||
}
|
||||
printf("build consumer to localhost successfully \n");
|
||||
|
||||
// ANCHOR_END: create_consumer_2
|
||||
|
||||
// ANCHOR: subscribe_3
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if (NULL == topic_list) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((code = tmq_subscribe(tmq, topic_list))) {
|
||||
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
|
||||
}
|
||||
|
||||
tmq_list_destroy(topic_list);
|
||||
|
||||
basic_consume_loop(tmq);
|
||||
// ANCHOR_END: subscribe_3
|
||||
|
||||
consume_repeatly(tmq);
|
||||
|
||||
manual_commit(tmq);
|
||||
|
||||
// ANCHOR: unsubscribe_and_close
|
||||
// unsubscribe the topic
|
||||
code = tmq_unsubscribe(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to tmq_unsubscribe(): %s\n", tmq_err2str(code));
|
||||
}
|
||||
fprintf(stderr, "Unsubscribed consumer successfully.\n");
|
||||
// close the consumer
|
||||
code = tmq_consumer_close(tmq);
|
||||
if (code) {
|
||||
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
|
||||
} else {
|
||||
fprintf(stderr, "Consumer closed successfully.\n");
|
||||
}
|
||||
// ANCHOR_END: unsubscribe_and_close
|
||||
|
||||
thread_stop = 1;
|
||||
pthread_join(thread_id, NULL);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
// TAOS standard API example. The same syntax as MySQL, but only a subset
|
||||
// to compile: gcc -o CWithReqIdDemo CWithReqIdDemo.c -ltaos
|
||||
// to compile: gcc -o with_reqid_demo with_reqid_demo.c -ltaos
|
||||
|
||||
#include <inttypes.h>
|
||||
#include <stdio.h>
|
||||
|
@ -32,23 +32,24 @@ const char *password = "taosdata";
|
|||
// connect
|
||||
TAOS *taos = taos_connect(ip, user, password, NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason: %s\n", taos_errstr(NULL));
|
||||
printf("failed to connect to server %s, reason: %s\n", ip, taos_errstr(NULL));
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
printf("success to connect server %s\n", ip);
|
||||
|
||||
// create database
|
||||
TAOS_RES *result = taos_query_with_reqid(taos, "CREATE DATABASE IF NOT EXISTS power", 1L);
|
||||
int code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to create database, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to create database power, reason: %s\n", taos_errstr(result));
|
||||
taos_free_result(result);
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(result);
|
||||
printf("success to create database\n");
|
||||
printf("success to create database power\n");
|
||||
|
||||
// use database
|
||||
result = taos_query_with_reqid(taos, "USE power", 2L);
|
||||
|
@ -59,7 +60,7 @@ const char* sql = "SELECT * FROM power.meters";
|
|||
result = taos_query_with_reqid(taos, sql, 3L);
|
||||
code = taos_errno(result);
|
||||
if (code != 0) {
|
||||
printf("failed to select, reason: %s\n", taos_errstr(result));
|
||||
printf("failed to query data from power.meters, ip: %s, reason: %s\n", ip, taos_errstr(result));
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
return -1;
|
||||
|
@ -82,6 +83,7 @@ while ((row = taos_fetch_row(result))) {
|
|||
}
|
||||
printf("total rows: %d\n", rows);
|
||||
taos_free_result(result);
|
||||
printf("success to query data from power.meters\n");
|
||||
|
||||
// close & clean
|
||||
taos_close(taos);
|
|
@ -380,16 +380,7 @@ DSN 的详细说明和如何使用详见 [连接功能](../../reference/connecto
|
|||
下面为建立连接的示例代码,其中省略了查询和写入部分,展示了如何建立连接、关闭连接以及清除资源。
|
||||
|
||||
```c
|
||||
TAOS *taos = taos_connect("localhost:6030", "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* put your code here for read and write */
|
||||
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
{{#include docs/examples/c/connect_example.c}}
|
||||
```
|
||||
|
||||
在上面的示例代码中, `taos_connect()` 建立到客户端程序所在主机的 6030 端口的连接,`taos_close()`关闭当前连接,`taos_cleanup()`清除客户端驱动所申请和使用的资源。
|
||||
|
|
|
@ -69,7 +69,7 @@ REST API:直接调用 `taosadapter` 提供的 REST API 接口,进行数据
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/CCreateDBDemo.c:create_db_and_table}}
|
||||
{{#include docs/examples/c/create_db_demo.c:create_db_and_table}}
|
||||
```
|
||||
> **注意**:如果不使用 `USE power` 指定数据库,则后续对表的操作都需要增加数据库名称作为前缀,如 power.meters。
|
||||
</TabItem>
|
||||
|
@ -146,7 +146,7 @@ NOW 为系统内部函数,默认为客户端所在计算机当前时间。 NOW
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/CInsertDataDemo.c:insert_data}}
|
||||
{{#include docs/examples/c/insert_data_demo.c:insert_data}}
|
||||
```
|
||||
|
||||
**Note**
|
||||
|
@ -215,7 +215,7 @@ curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' \
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/CQueryDataDemo.c:query_data}}
|
||||
{{#include docs/examples/c/query_data_demo.c:query_data}}
|
||||
```
|
||||
</TabItem>
|
||||
<TabItem label="REST API" value="rest">
|
||||
|
@ -290,7 +290,7 @@ reqId 可用于请求链路追踪,reqId 就像分布式系统中的 traceId
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/CWithReqIdDemo.c:with_reqid}}
|
||||
{{#include docs/examples/c/with_reqid_demo.c:with_reqid}}
|
||||
```
|
||||
</TabItem>
|
||||
<TabItem label="REST API" value="rest">
|
||||
|
|
|
@ -246,7 +246,7 @@ writer.write(lineDemo, SchemalessProtocolType.LINE, SchemalessTimestampType.NANO
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/CSmlInsertDemo.c:schemaless}}
|
||||
{{#include docs/examples/c/sml_insert_demo.c:schemaless}}
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ import TabItem from "@theme/TabItem";
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/CStmtInsertDemo.c}}
|
||||
{{#include docs/examples/c/stmt_insert_demo.c}}
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
|
|
|
@ -93,6 +93,8 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
|||
</TabItem>
|
||||
<TabItem label="C" value="c">
|
||||
|
||||
同通用基础配置项。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
||||
|
@ -194,6 +196,15 @@ Rust 连接器创建消费者的参数为 DSN, 可以设置的参数列表请
|
|||
|
||||
<TabItem label="C" value="c">
|
||||
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:create_consumer_1}}
|
||||
```
|
||||
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:create_consumer_2}}
|
||||
```
|
||||
|
||||
调用 `build_consumer` 函数尝试获取消费者实例 `tmq`。成功则打印成功日志,失败则打印失败日志。
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
@ -294,6 +305,28 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
</TabItem>
|
||||
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:build_topic_list}}
|
||||
```
|
||||
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:basic_consume_loop}}
|
||||
```
|
||||
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:msg_process}}
|
||||
```
|
||||
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:subscribe_3}}
|
||||
```
|
||||
|
||||
订阅消费数据步骤:
|
||||
1. 调用 `build_topic_list` 函数创建一个主题列表 `topic_list`。
|
||||
2. 如果 `topic_list` 为 `NULL`,表示创建失败,函数返回 `-1`。
|
||||
3. 使用 `tmq_subscribe` 函数订阅 `tmq` 指定的主题列表。如果订阅失败,打印错误信息。
|
||||
4. 销毁主题列表 `topic_list` 以释放资源。
|
||||
5. 调用 `basic_consume_loop` 函数开始基本的消费循环,处理订阅的消息。
|
||||
|
||||
</TabItem>
|
||||
|
||||
|
@ -406,6 +439,16 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
</TabItem>
|
||||
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:consume_repeatly}}
|
||||
```
|
||||
|
||||
1. 通过 `tmq_get_topic_assignment` 函数获取特定主题的分配信息,包括分配的数量和具体分配详情。
|
||||
2. 如果获取分配信息失败,则打印错误信息并返回。
|
||||
3. 对于每个分配,使用 `tmq_offset_seek` 函数将消费者的偏移量设置到最早的偏移量。
|
||||
4. 如果设置偏移量失败,则打印错误信息。
|
||||
5. 释放分配信息数组以释放资源。
|
||||
6. 调用 `basic_consume_loop` 函数开始新的的消费循环,处理消息。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
@ -495,10 +538,6 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
可以通过 `consumer.commit` 方法来手工提交消费进度。
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.js" value="node">
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="C#" value="csharp">
|
||||
```csharp
|
||||
{{#include docs/examples/csharp/subscribe/Program.cs:commit_offset}}
|
||||
|
@ -506,6 +545,11 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
</TabItem>
|
||||
|
||||
<TabItem label="C" value="c">
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:manual_commit}}
|
||||
```
|
||||
|
||||
可以通过 `tmq_commit_sync` 函数来手工提交消费进度。
|
||||
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
@ -589,10 +633,6 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
**注意**:消费者取消订阅后无法重用,如果想订阅新的 `topic`, 请重新创建消费者。
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.js" value="node">
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="C#" value="csharp">
|
||||
```csharp
|
||||
{{#include docs/examples/csharp/subscribe/Program.cs:close}}
|
||||
|
@ -600,7 +640,9 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
</TabItem>
|
||||
|
||||
<TabItem label="C" value="c">
|
||||
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c:unsubscribe_and_close}}
|
||||
```
|
||||
</TabItem>
|
||||
|
||||
</Tabs>
|
||||
|
@ -693,10 +735,6 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
```
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="Node.js" value="node">
|
||||
|
||||
</TabItem>
|
||||
|
||||
<TabItem label="C#" value="csharp">
|
||||
```csharp
|
||||
{{#include docs/examples/csharp/subscribe/Program.cs}}
|
||||
|
@ -704,6 +742,11 @@ TMQ 消息队列是一个 [futures::Stream](https://docs.rs/futures/latest/futur
|
|||
</TabItem>
|
||||
|
||||
<TabItem label="C" value="c">
|
||||
|
||||
<details>
|
||||
<summary>完整原生连接代码示例</summary>
|
||||
```c
|
||||
{{#include docs/examples/c/tmq_demo.c}}
|
||||
```
|
||||
</details>
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
|
|
|
@ -29,37 +29,6 @@ TDengine 客户端驱动的动态库位于:
|
|||
|
||||
TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一对应的强对应关系,建议使用与 TDengine 服务端完全相同的客户端驱动。虽然低版本的客户端驱动在前三段版本号一致(即仅第四段版本号不同)的情况下也能够与高版本的服务端相兼容,但这并非推荐用法。强烈不建议使用高版本的客户端驱动访问低版本的服务端。
|
||||
|
||||
## 安装步骤
|
||||
|
||||
TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
|
||||
|
||||
## 建立连接
|
||||
|
||||
使用客户端驱动访问 TDengine 集群的基本过程为:建立连接、查询和写入、关闭连接、清除资源。
|
||||
|
||||
下面为建立连接的示例代码,其中省略了查询和写入部分,展示了如何建立连接、关闭连接以及清除资源。
|
||||
|
||||
```c
|
||||
TAOS *taos = taos_connect("localhost:6030", "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
printf("failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* put your code here for read and write */
|
||||
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
```
|
||||
|
||||
在上面的示例代码中, `taos_connect()` 建立到客户端程序所在主机的 6030 端口的连接,`taos_close()`关闭当前连接,`taos_cleanup()`清除客户端驱动所申请和使用的资源。
|
||||
|
||||
:::note
|
||||
|
||||
- 如未特别说明,当 API 的返回值是整数时,_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
|
||||
- 所有的错误码以及对应的原因描述在 `taoserror.h` 文件中。 [错误码文档](../cpp/error-code)
|
||||
|
||||
:::
|
||||
## 错误码
|
||||
|
||||
在 C 接口的设计中,错误码采用整数类型表示,每个错误码都对应一个特定的错误状态。如未特别说明,当 API 的返回值是整数时,_0_ 代表成功,其它是代表失败原因的错误码,当返回值是指针时, _NULL_ 表示失败。
|
||||
|
@ -230,7 +199,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
|
|||
|
||||
- `int taos_init()`
|
||||
- **接口说明**:初始化运行环境。如果没有主动调用该 API,那么调用 `taos_connect()` 时驱动将自动调用该 API,故程序一般无需手动调用。
|
||||
- **返回值**:`0`:成功,`非0`:失败,可调用函数 taos_errstr(NULL) 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功,非 `0`:失败,可调用函数 taos_errstr(NULL) 获取更详细的错误信息。
|
||||
|
||||
- `void taos_cleanup()`
|
||||
- **接口说明**:清理运行环境,应用退出前应调用。
|
||||
|
@ -280,7 +249,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
|
|||
- **参数说明**:
|
||||
- taos:[入参] 指向数据库连接的指针,数据库连接是通过 `taos_connect()` 函数建立。
|
||||
- db:[入参] 数据库名称。
|
||||
- **返回值**:`0`:成功,`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功,非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_get_current_db(TAOS *taos, char *database, int len, int *required)`
|
||||
- **接口说明**:获取当前数据库名称。
|
||||
|
@ -447,21 +416,21 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- sql:[入参] 需要解析的 SQL 语句。
|
||||
- length:[入参] 参数 sql 的长度。如果参数 length 大于 0,将使用此参数作为 SQL 语句的长度,如等于 0,将自动判断 SQL 语句的长度。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)`
|
||||
- **接口说明**:绑定参数到一个预编译的 SQL 语句。不如 `taos_stmt_bind_param_batch()` 效率高,但可以支持非 INSERT 类型的 SQL 语句。
|
||||
- **参数说明**:
|
||||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- bind:[入参] 指向一个有效的 TAOS_MULTI_BIND 结构体指针,该结构体包含了要绑定到 SQL 语句中的参数列表。需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_MULTI_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)`
|
||||
- **接口说明**:(2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。
|
||||
- **参数说明**:
|
||||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- name:[入参] 指向一个包含子表名称的字符串常量。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)`
|
||||
- **接口说明**:(2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列)。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。
|
||||
|
@ -469,24 +438,24 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- name:[入参] 指向一个包含子表名称的字符串常量。
|
||||
- tags:[入参] 指向一个有效的 TAOS_MULTI_BIND 结构体指针,该结构体包含了子表标签的值。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind)`
|
||||
- **接口说明**:(2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。
|
||||
- **参数说明**:
|
||||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- bind:[入参] 指向一个有效的 TAOS_MULTI_BIND 结构体指针,该结构体包含了要批量绑定到 SQL 语句中的参数列表。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
|
||||
- **接口说明**:将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `taos_stmt_bind_param()` 或 `taos_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。
|
||||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_execute(TAOS_STMT *stmt)`
|
||||
- **接口说明**:执行准备好的语句。目前,一条语句只能执行一次。
|
||||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `int taos_stmt_affected_rows(TAOS_STMT *stmt)`
|
||||
- **接口说明**:获取执行预编译 SQL 语句后受影响的行数。
|
||||
|
@ -506,7 +475,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- `int taos_stmt_close(TAOS_STMT *stmt)`
|
||||
- **接口说明**:执行完毕,释放所有资源。
|
||||
- stmt:[入参] 指向一个有效的预编译的 SQL 语句对象指针。
|
||||
- **返回值**:`0`:成功。`非0`:失败,详情请参考错误码页面。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,详情请参考错误码页面。
|
||||
|
||||
- `char * taos_stmt_errstr(TAOS_STMT *stmt)`
|
||||
- **接口说明**:(2.1.3.0 版本新增)用于在其他 STMT API 返回错误(返回错误码或空指针)时获取错误信息。
|
||||
|
@ -680,7 +649,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- **接口说明**:用于向 tmq_list_t 结构体中添加一个 topic。
|
||||
- list:[入参] 指向一个有效的 tmq_list_t 结构体指针,该结构体代表一个 TMQ 列表对象。
|
||||
- topic:[入参] topic 名称。
|
||||
- **返回值**:`0`:成功。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `void tmq_list_destroy(tmq_list_t *list)`
|
||||
- **接口说明**:用于销毁 tmq_list_t 结构体,tmq_list_new 的结果需要通过该接口销毁。
|
||||
|
@ -707,18 +676,18 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- **接口说明**:用于订阅 topic 列表,消费完数据后,需调用 tmq_subscribe 取消订阅。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- topic_list:[入参] 指向一个有效的 tmq_list_t 结构体指针,该结构体包含一个或多个主题名称。
|
||||
- **返回值**:`0`:成功。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `int32_t tmq_unsubscribe(tmq_t *tmq)`
|
||||
- **接口说明**:用于取消订阅的 topic 列表。需与 tmq_subscribe 配合使用。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- **返回值**:`0`:成功。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topic_list)`
|
||||
- **接口说明**:用于获取订阅的 topic 列表。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- topic_list:[出参] 指向一个 tmq_list_t 结构体指针的指针,用于接收当前订阅的主题列表。
|
||||
- **返回值**:`0`:成功。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout)`
|
||||
- **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。
|
||||
|
@ -729,7 +698,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- `int32_t tmq_consumer_close(tmq_t *tmq)`
|
||||
- **接口说明**:用于关闭 tmq_t 结构体。需与 tmq_consumer_new 配合使用。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- **返回值**:`0`:成功。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
|
||||
- **接口说明**:返回当前 consumer 分配的 vgroup 的信息,每个 vgroup 的信息包括 vgId,wal 的最大最小 offset,以及当前消费到的 offset。
|
||||
|
@ -737,7 +706,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- pTopicName:[入参] 要查询分配信息的主题名称。
|
||||
- assignment:[出参] 指向一个 tmq_topic_assignment 结构体指针的指针,用于接收分配信息。数据大小为 numOfAssignment,需要通过 tmq_free_assignment 接口释放。
|
||||
- numOfAssignment:[出参] 指向一个整数指针,用于接收分配给该 consumer 有效的 vgroup 个数。
|
||||
- **返回值**:`0`:成功。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
|
||||
- **接口说明**:返回当前consumer分配的vgroup的信息,每个vgroup的信息包括vgId,wal的最大最小offset,以及当前消费到的offset。
|
||||
|
@ -754,7 +723,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- **接口说明**:同步提交 TMQ 消费者对象处理的消息偏移量。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- msg:[入参] 指向一个有效的 TAOS_RES 结构体指针,该结构体包含了已处理的消息。如果为 NULL,提交当前 consumer 所有消费的 vgroup 的当前进度。
|
||||
- **返回值**:`0`:成功,已经成功提交偏移量。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功,已经成功提交偏移量。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
|
||||
- **接口说明**:异步提交 TMQ 消费者对象处理的消息偏移量。
|
||||
|
@ -767,7 +736,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- **接口说明**:同步提交 TMQ 消费者对象的特定主题和 vgroup 的偏移量。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- pTopicName:[入参] 要提交偏移量的主题名称。
|
||||
- **返回值**:`0`:成功,已经成功提交偏移量。`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功,已经成功提交偏移量。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)`
|
||||
- **接口说明**:异步提交 TMQ 消费者对象的特定主题和 vgroup 的偏移量。
|
||||
|
@ -796,7 +765,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- pTopicName:[入参] 要查询当前位置的主题名称。
|
||||
- vgId:[入参] 虚拟组 vgroup 的 ID。
|
||||
- offset:[入参] 虚拟组 vgroup 的 ID。
|
||||
- **返回值**:`0`:成功,`非0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
- **返回值**:`0`:成功,非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。
|
||||
|
||||
- `int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
|
||||
- **接口说明**:从 TMQ 消费者获取的消息结果中提取虚拟组(vgroup)的当前消费数据位置的偏移量。
|
||||
|
@ -811,12 +780,12 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- `TAOS *tmq_get_connect(tmq_t *tmq)`
|
||||
- **接口说明**:从 TMQ 消费者对象中获取与 TDengine 数据库的连接句柄。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- **返回值**:非NULL`:成功,返回一个 TAOS * 类型的指针,指向与 TDengine 数据库的连接句柄。`NULL`:失败,非法的输入参数。
|
||||
- **返回值**:非 `NULL`:成功,返回一个 TAOS * 类型的指针,指向与 TDengine 数据库的连接句柄。`NULL`:失败,非法的输入参数。
|
||||
|
||||
- `const char *tmq_get_table_name(TAOS_RES *res)`
|
||||
- **接口说明**:从 TMQ 消费者获取的消息结果中获取所属的的表名。
|
||||
- res:[入参] 指向一个有效的 TAOS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
|
||||
- **返回值**:非NULL`:成功,返回一个 const char * 类型的指针,指向表名字符串。`NULL`:失败,非法的输入参数。
|
||||
- **返回值**:非 `NULL`:成功,返回一个 const char * 类型的指针,指向表名字符串。`NULL`:失败,非法的输入参数。
|
||||
|
||||
- `tmq_res_t tmq_get_res_type(TAOS_RES *res)`
|
||||
- **接口说明**:从 TMQ 消费者获取的消息结果中获取消息类型。
|
||||
|
@ -835,9 +804,9 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- `const char *tmq_get_topic_name(TAOS_RES *res)`
|
||||
- **接口说明**:从 TMQ 消费者获取的消息结果中获取所属的 topic 名称。
|
||||
- res:[入参] 指向一个有效的 TAOS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
|
||||
- **返回值**:非NULL`:成功,返回一个 const char * 类型的指针,指向 topic 名称字符串。`NULL`:失败,非法的输入参数。
|
||||
- **返回值**:非 `NULL`:成功,返回一个 const char * 类型的指针,指向 topic 名称字符串。`NULL`:失败,非法的输入参数。
|
||||
|
||||
- `const char *tmq_get_db_name(TAOS_RES *res)`
|
||||
- **接口说明**:从 TMQ 消费者获取的消息结果中获取所属的数据库名称。
|
||||
- res:[入参] 指向一个有效的 TAOS_RES 结构体指针,该结构体包含了从 TMQ 消费者轮询得到的消息。
|
||||
- **返回值**:非NULL`:成功,返回一个 const char * 类型的指针,指向数据库名称字符串。`NULL`:失败,非法的输入参数。
|
||||
- **返回值**:非 `NULL`:成功,返回一个 const char * 类型的指针,指向数据库名称字符串。`NULL`:失败,非法的输入参数。
|
||||
|
|
Loading…
Reference in New Issue