diff --git a/docs/examples/c/tmq_demo.c b/docs/examples/c/tmq_demo.c
index 30cbbdc1c0..6277d61b8a 100644
--- a/docs/examples/c/tmq_demo.c
+++ b/docs/examples/c/tmq_demo.c
@@ -13,6 +13,8 @@
* along with this program. If not, see .
*/
+// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
+
#include
#include
#include
@@ -33,9 +35,9 @@ void* prepare_data(void* arg) {
uint16_t port = 6030;
TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
- printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
+ fprintf(stderr, "Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
- return -1;
+ return NULL;
}
TAOS_RES* pRes;
@@ -52,12 +54,12 @@ void* prepare_data(void* arg) {
pRes = taos_query(pConn, buf);
if (taos_errno(pRes) != 0) {
- printf("Failed to insert data to power.meters, reason: %s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to insert data to power.meters, reason: %s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
sleep(1);
}
- printf("Prepare data thread exit\n");
+ fprintf(stdout, "Prepare data thread exit\n");
return NULL;
}
@@ -69,9 +71,9 @@ static int32_t msg_process(TAOS_RES* msg) {
const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg);
- printf("topic: %s\n", topicName);
- printf("db: %s\n", dbName);
- printf("vgroup id: %d\n", vgroupId);
+ fprintf(stdout, "topic: %s\n", topicName);
+ fprintf(stdout, "db: %s\n", dbName);
+ fprintf(stdout, "vgroup id: %d\n", vgroupId);
while (1) {
// get one row data from message
@@ -87,11 +89,11 @@ static int32_t msg_process(TAOS_RES* msg) {
rows++;
// print the row content
if (taos_print_row(buf, row, fields, numOfFields) < 0) {
- printf("Failed to print row\n");
+ fprintf(stderr, "Failed to print row\n");
break;
}
// print the precision and row content to the console
- printf("precision: %d, row content: %s\n", precision, buf);
+ fprintf(stdout, "precision: %d, data: %s\n", precision, buf);
}
return rows;
@@ -105,42 +107,42 @@ static int32_t init_env() {
uint16_t port = 6030;
TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
- printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
+ fprintf(stderr, "Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return -1;
}
TAOS_RES* pRes;
// drop database if exists
- printf("create database\n");
+ fprintf(stdout, "Create database.\n");
pRes = taos_query(pConn, "drop topic if exists topic_meters");
if (taos_errno(pRes) != 0) {
- printf("error in drop topic_meters, reason:%s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to drop topic_meters, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists power");
if (taos_errno(pRes) != 0) {
- printf("error in drop power, reason:%s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to drop database power, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
// create database
pRes = taos_query(pConn, "create database power precision 'ms' WAL_RETENTION_PERIOD 3600");
if (taos_errno(pRes) != 0) {
- printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to create tmqdb, reason:%s\n", taos_errstr(pRes));
goto END;
}
taos_free_result(pRes);
// create super table
- printf("create super table\n");
+ fprintf(stdout, "Create super table.\n");
pRes = taos_query(
pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))");
if (taos_errno(pRes) != 0) {
- printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to create super table meters, reason:%s\n", taos_errstr(pRes));
goto END;
}
@@ -155,7 +157,7 @@ END:
}
int32_t create_topic() {
- printf("create topic\n");
+ fprintf(stdout, "Create topic.\n");
TAOS_RES* pRes;
const char *host = "localhost";
const char *user = "root";
@@ -163,14 +165,14 @@ int32_t create_topic() {
uint16_t port = 6030;
TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) {
- printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
+ fprintf(stderr, "Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup();
return -1;
}
pRes = taos_query(pConn, "use power");
if (taos_errno(pRes) != 0) {
- printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
@@ -179,7 +181,7 @@ int32_t create_topic() {
pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
if (taos_errno(pRes) != 0) {
- printf("failed to create topic topic_meters, reason:%s\n", taos_errstr(pRes));
+ fprintf(stderr, "Failed to create topic topic_meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
@@ -189,7 +191,7 @@ int32_t create_topic() {
}
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
- printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
+ fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
}
// ANCHOR: create_consumer_1
@@ -261,6 +263,7 @@ tmq_list_t* build_topic_list() {
if (code) {
// if failed, destroy the list and return NULL
tmq_list_destroy(topicList);
+ fprintf(stderr, "Failed to create topic_list, ErrCode: 0x%x; ErrMessage: %s.\n", code, tmq_err2str(code));
return NULL;
}
// if success, return the list
@@ -313,7 +316,9 @@ void consume_repeatly(tmq_t* tmq) {
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
- fprintf(stderr, "Failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code));
+ fprintf(stderr, "Failed to seek assignment %d to beginning %ld, reason: %s.\n", i, p->begin, tmq_err2str(code));
+ } else {
+ fprintf(stdout, "Seek assignment %d to beginning %ld successfully.\n", i, p->begin);
}
}
@@ -347,7 +352,7 @@ void manual_commit(tmq_t* tmq) {
taos_free_result(tmqmsg);
break;
} else {
- printf("Commit offset manually successfully.");
+ fprintf(stdout, "Commit offset manually successfully.");
}
// free the message
taos_free_result(tmqmsg);
@@ -386,20 +391,21 @@ int main(int argc, char* argv[]) {
fprintf(stderr, "Failed to create consumer.\n");
return -1;
}
- printf("Create consumer successfully.\n");
+ fprintf(stdout, "Create consumer successfully.\n");
// ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) {
+ fprintf(stderr, "Failed to create topic_list.\n");
return -1;
}
if ((code = tmq_subscribe(tmq, topic_list))) {
- fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
+ fprintf(stderr, "Failed to subscribe tmq_subscribe(): %s\n", tmq_err2str(code));
} else {
- printf("Subscribe topics successfully.\n");
+ fprintf(stdout, "Subscribe topics successfully.\n");
}
tmq_list_destroy(topic_list);