c language sample optimization

This commit is contained in:
Yaming Pei 2024-08-12 15:54:49 +08:00
parent dd3e92d707
commit 4d595d28c1
1 changed files with 32 additions and 26 deletions

View File

@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// to compile: gcc -o tmq_demo tmq_demo.c -ltaos -lpthread
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
@ -33,9 +35,9 @@ void* prepare_data(void* arg) {
uint16_t port = 6030; uint16_t port = 6030;
TAOS *pConn = taos_connect(host, user, password, NULL, port); TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); fprintf(stderr, "Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return NULL;
} }
TAOS_RES* pRes; TAOS_RES* pRes;
@ -52,12 +54,12 @@ void* prepare_data(void* arg) {
pRes = taos_query(pConn, buf); pRes = taos_query(pConn, buf);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("Failed to insert data to power.meters, reason: %s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to insert data to power.meters, reason: %s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
sleep(1); sleep(1);
} }
printf("Prepare data thread exit\n"); fprintf(stdout, "Prepare data thread exit\n");
return NULL; return NULL;
} }
@ -69,9 +71,9 @@ static int32_t msg_process(TAOS_RES* msg) {
const char* dbName = tmq_get_db_name(msg); const char* dbName = tmq_get_db_name(msg);
int32_t vgroupId = tmq_get_vgroup_id(msg); int32_t vgroupId = tmq_get_vgroup_id(msg);
printf("topic: %s\n", topicName); fprintf(stdout, "topic: %s\n", topicName);
printf("db: %s\n", dbName); fprintf(stdout, "db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId); fprintf(stdout, "vgroup id: %d\n", vgroupId);
while (1) { while (1) {
// get one row data from message // get one row data from message
@ -87,11 +89,11 @@ static int32_t msg_process(TAOS_RES* msg) {
rows++; rows++;
// print the row content // print the row content
if (taos_print_row(buf, row, fields, numOfFields) < 0) { if (taos_print_row(buf, row, fields, numOfFields) < 0) {
printf("Failed to print row\n"); fprintf(stderr, "Failed to print row\n");
break; break;
} }
// print the precision and row content to the console // print the precision and row content to the console
printf("precision: %d, row content: %s\n", precision, buf); fprintf(stdout, "precision: %d, data: %s\n", precision, buf);
} }
return rows; return rows;
@ -105,42 +107,42 @@ static int32_t init_env() {
uint16_t port = 6030; uint16_t port = 6030;
TAOS *pConn = taos_connect(host, user, password, NULL, port); TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); fprintf(stderr, "Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
TAOS_RES* pRes; TAOS_RES* pRes;
// drop database if exists // drop database if exists
printf("create database\n"); fprintf(stdout, "Create database.\n");
pRes = taos_query(pConn, "drop topic if exists topic_meters"); pRes = taos_query(pConn, "drop topic if exists topic_meters");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in drop topic_meters, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to drop topic_meters, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists power"); pRes = taos_query(pConn, "drop database if exists power");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in drop power, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to drop database power, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); taos_free_result(pRes);
// create database // create database
pRes = taos_query(pConn, "create database power precision 'ms' WAL_RETENTION_PERIOD 3600"); pRes = taos_query(pConn, "create database power precision 'ms' WAL_RETENTION_PERIOD 3600");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to create tmqdb, reason:%s\n", taos_errstr(pRes));
goto END; goto END;
} }
taos_free_result(pRes); taos_free_result(pRes);
// create super table // create super table
printf("create super table\n"); fprintf(stdout, "Create super table.\n");
pRes = taos_query( pRes = taos_query(
pConn, pConn,
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS " "CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS "
"(groupId INT, location BINARY(24))"); "(groupId INT, location BINARY(24))");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to create super table meters, reason:%s\n", taos_errstr(pRes));
goto END; goto END;
} }
@ -155,7 +157,7 @@ END:
} }
int32_t create_topic() { int32_t create_topic() {
printf("create topic\n"); fprintf(stdout, "Create topic.\n");
TAOS_RES* pRes; TAOS_RES* pRes;
const char *host = "localhost"; const char *host = "localhost";
const char *user = "root"; const char *user = "root";
@ -163,14 +165,14 @@ int32_t create_topic() {
uint16_t port = 6030; uint16_t port = 6030;
TAOS *pConn = taos_connect(host, user, password, NULL, port); TAOS *pConn = taos_connect(host, user, password, NULL, port);
if (pConn == NULL) { if (pConn == NULL) {
printf("Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL)); fprintf(stderr, "Failed to connect to %s:%hu; ErrCode: 0x%x; ErrMessage: %s.\n", host, port, taos_errno(NULL), taos_errstr(NULL));
taos_cleanup(); taos_cleanup();
return -1; return -1;
} }
pRes = taos_query(pConn, "use power"); pRes = taos_query(pConn, "use power");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use tmqdb, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to use tmqdb, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -179,7 +181,7 @@ int32_t create_topic() {
pConn, pConn,
"CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters"); "CREATE TOPIC IF NOT EXISTS topic_meters AS SELECT ts, current, voltage, phase, groupid, location FROM meters");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_meters, reason:%s\n", taos_errstr(pRes)); fprintf(stderr, "Failed to create topic topic_meters, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -189,7 +191,7 @@ int32_t create_topic() {
} }
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param); fprintf(stdout, "tmq_commit_cb_print() code: %d, tmq: %p, param: %p\n", code, tmq, param);
} }
// ANCHOR: create_consumer_1 // ANCHOR: create_consumer_1
@ -261,6 +263,7 @@ tmq_list_t* build_topic_list() {
if (code) { if (code) {
// if failed, destroy the list and return NULL // if failed, destroy the list and return NULL
tmq_list_destroy(topicList); tmq_list_destroy(topicList);
fprintf(stderr, "Failed to create topic_list, ErrCode: 0x%x; ErrMessage: %s.\n", code, tmq_err2str(code));
return NULL; return NULL;
} }
// if success, return the list // if success, return the list
@ -313,7 +316,9 @@ void consume_repeatly(tmq_t* tmq) {
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) { if (code != 0) {
fprintf(stderr, "Failed to seek to %d, reason:%s", (int)p->begin, tmq_err2str(code)); fprintf(stderr, "Failed to seek assignment %d to beginning %ld, reason: %s.\n", i, p->begin, tmq_err2str(code));
} else {
fprintf(stdout, "Seek assignment %d to beginning %ld successfully.\n", i, p->begin);
} }
} }
@ -347,7 +352,7 @@ void manual_commit(tmq_t* tmq) {
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
break; break;
} else { } else {
printf("Commit offset manually successfully."); fprintf(stdout, "Commit offset manually successfully.");
} }
// free the message // free the message
taos_free_result(tmqmsg); taos_free_result(tmqmsg);
@ -386,20 +391,21 @@ int main(int argc, char* argv[]) {
fprintf(stderr, "Failed to create consumer.\n"); fprintf(stderr, "Failed to create consumer.\n");
return -1; return -1;
} }
printf("Create consumer successfully.\n"); fprintf(stdout, "Create consumer successfully.\n");
// ANCHOR_END: create_consumer_2 // ANCHOR_END: create_consumer_2
// ANCHOR: subscribe_3 // ANCHOR: subscribe_3
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
if (NULL == topic_list) { if (NULL == topic_list) {
fprintf(stderr, "Failed to create topic_list.\n");
return -1; return -1;
} }
if ((code = tmq_subscribe(tmq, topic_list))) { if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to subscribe tmq_subscribe(): %s\n", tmq_err2str(code));
} else { } else {
printf("Subscribe topics successfully.\n"); fprintf(stdout, "Subscribe topics successfully.\n");
} }
tmq_list_destroy(topic_list); tmq_list_destroy(topic_list);