add demo for tmq
This commit is contained in:
parent
3534852812
commit
174378ad2b
|
@ -28,6 +28,7 @@ endif(${BUILD_TEST})
|
|||
add_subdirectory(source)
|
||||
add_subdirectory(tools)
|
||||
add_subdirectory(tests)
|
||||
add_subdirectory(example)
|
||||
|
||||
# docs
|
||||
add_subdirectory(docs)
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
aux_source_directory(src TMQ_DEMO_SRC)
|
||||
|
||||
add_executable(tmq ${TMQ_DEMO_SRC})
|
||||
target_link_libraries(
|
||||
tmq
|
||||
PUBLIC taos
|
||||
#PUBLIC util
|
||||
#PUBLIC common
|
||||
#PUBLIC os
|
||||
)
|
||||
target_include_directories(
|
||||
tmq
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
|
||||
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <assert.h>
|
||||
#include "taos.h"
|
||||
|
||||
static int running = 1;
|
||||
static void msg_process(tmq_message_t* message) {
|
||||
tmqShowMsg(message);
|
||||
}
|
||||
|
||||
int32_t init_env() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table tu2 using st1 tags(2)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
|
||||
char* sql = "select * from st1";
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||
/*if (taos_errno(pRes) != 0) {*/
|
||||
/*printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
/*taos_free_result(pRes);*/
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "tg2");
|
||||
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
|
||||
return tmq;
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "test_stb_topic_1");
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "test_stb_topic_1");
|
||||
return topic_list;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t *tmq,
|
||||
tmq_list_t *topics) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
|
||||
while (running) {
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmq) {
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
void sync_consume_loop(tmq_t *rk,
|
||||
tmq_list_t *topics) {
|
||||
static const int MIN_COMMIT_COUNT = 1000;
|
||||
|
||||
int msg_count = 0;
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(rk, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
return;
|
||||
}
|
||||
|
||||
while (running) {
|
||||
tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500);
|
||||
if (tmqmessage) {
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
|
||||
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
|
||||
tmq_commit(rk, NULL, 0);
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(rk);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
int main() {
|
||||
int code;
|
||||
code = init_env();
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
/*basic_consume_loop(tmq, topic_list);*/
|
||||
sync_consume_loop(tmq, topic_list);
|
||||
}
|
|
@ -218,6 +218,7 @@ DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, char *);
|
|||
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
|
||||
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
|
||||
DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message);
|
||||
DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t);
|
||||
|
||||
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
|
||||
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
|
||||
|
@ -226,8 +227,8 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
|
|||
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
|
||||
#endif
|
||||
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
|
||||
#if 0
|
||||
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
|
||||
#if 0
|
||||
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
|
||||
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
|
||||
#endif
|
||||
|
|
|
@ -348,7 +348,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
goto _return;
|
||||
}
|
||||
|
||||
printf("%s\n", pStr);
|
||||
/*printf("%s\n", pStr);*/
|
||||
|
||||
// The topic should be related to a database that the queried table is belonged to.
|
||||
SName name = {0};
|
||||
|
@ -501,7 +501,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
|
||||
SMqClientVg* pVg = pParam->pVg;
|
||||
if (code != 0) {
|
||||
printf("msg discard\n");
|
||||
/*printf("msg discard\n");*/
|
||||
tsem_post(&pParam->rspSem);
|
||||
return 0;
|
||||
}
|
||||
|
@ -512,7 +512,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
return -1;
|
||||
}
|
||||
tDecodeSMqConsumeRsp(pMsg->pData, pRsp);
|
||||
printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);
|
||||
/*printf("rsp %ld %ld\n", pRsp->committedOffset, pRsp->rspOffset);*/
|
||||
if (pRsp->numOfTopics == 0) {
|
||||
/*printf("no data\n");*/
|
||||
free(pRsp);
|
||||
|
@ -766,6 +766,16 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
|
|||
free(tmq_message);
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
|
||||
const char* tmq_err2str(tmq_resp_err_t err) {
|
||||
if (err == TMQ_RESP_ERR__SUCCESS) {
|
||||
return "success";
|
||||
}
|
||||
return "fail";
|
||||
}
|
||||
#if 0
|
||||
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
||||
tmq_t* pTmq = malloc(sizeof(tmq_t));
|
||||
|
|
|
@ -638,6 +638,7 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
|
|||
//if (msg == NULL) break;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST(testCase, tmq_subscribe_stb_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -676,7 +677,6 @@ TEST(testCase, tmq_consume_Test) {
|
|||
|
||||
TEST(testCase, tmq_commit_TEST) {
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
TEST(testCase, projection_query_tables) {
|
||||
|
|
|
@ -583,19 +583,23 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
|
|||
pSub->availConsumer = NULL;
|
||||
}
|
||||
if (pSub->assigned) {
|
||||
taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
//taosArrayDestroyEx(pSub->assigned, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->assigned);
|
||||
pSub->assigned = NULL;
|
||||
}
|
||||
if (pSub->unassignedVg) {
|
||||
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
//taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->unassignedVg);
|
||||
pSub->unassignedVg = NULL;
|
||||
}
|
||||
if (pSub->idleConsumer) {
|
||||
taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
//taosArrayDestroyEx(pSub->idleConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->idleConsumer);
|
||||
pSub->idleConsumer = NULL;
|
||||
}
|
||||
if (pSub->lostConsumer) {
|
||||
taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
//taosArrayDestroyEx(pSub->lostConsumer, (void (*)(void*))tDeleteSMqConsumerEp);
|
||||
taosArrayDestroy(pSub->lostConsumer);
|
||||
pSub->lostConsumer = NULL;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue