Merge branch '3.0' into feature/3.0_liaohj
This commit is contained in:
commit
8d435a043a
|
@ -63,7 +63,7 @@ int32_t init_env() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
const char* sql = "select * from tu2";
|
||||
const char* sql = "select * from tu1";
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
|
||||
|
@ -159,11 +159,45 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
int32_t batchCnt = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
clock_t startTime = clock();
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmqmessage) {
|
||||
batchCnt++;
|
||||
skipLogNum += tmqGetSkipLogNum(tmqmessage);
|
||||
/*msg_process(tmqmessage);*/
|
||||
tmq_message_destroy(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
clock_t endTime = clock();
|
||||
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
|
||||
(double)(endTime - startTime) / CLOCKS_PER_SEC);
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
int main() {
|
||||
int code;
|
||||
code = init_env();
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
basic_consume_loop(tmq, topic_list);
|
||||
perf_loop(tmq, topic_list);
|
||||
/*basic_consume_loop(tmq, topic_list);*/
|
||||
/*sync_consume_loop(tmq, topic_list);*/
|
||||
}
|
||||
|
|
|
@ -16,8 +16,8 @@
|
|||
#ifndef TDENGINE_TAOS_H
|
||||
#define TDENGINE_TAOS_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -128,7 +128,8 @@ typedef struct TAOS_MULTI_BIND {
|
|||
DLL_EXPORT void taos_cleanup(void);
|
||||
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
|
||||
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
|
||||
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
|
||||
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
|
||||
const char *db, int dbLen, uint16_t port);
|
||||
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
|
||||
DLL_EXPORT void taos_close(TAOS *taos);
|
||||
|
||||
|
@ -182,7 +183,8 @@ DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp,
|
|||
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
|
||||
|
||||
typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
|
||||
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, __taos_sub_fn_t fp, void *param, int interval);
|
||||
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp,
|
||||
void *param, int interval);
|
||||
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
|
||||
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
|
||||
|
||||
|
@ -253,6 +255,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
|
|||
|
||||
// temporary used function for demo only
|
||||
void tmqShowMsg(tmq_message_t *tmq_message);
|
||||
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -452,6 +452,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return 0;
|
||||
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
|
||||
return pRsp->skipLogNum;
|
||||
}
|
||||
|
||||
void tmqShowMsg(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return;
|
||||
|
||||
|
|
Loading…
Reference in New Issue