[add tmq cases]
This commit is contained in:
parent
e874173792
commit
ace84f8189
|
@ -37,6 +37,6 @@
|
||||||
|
|
||||||
# ---- tmq
|
# ---- tmq
|
||||||
./test.sh -f tsim/tmq/basic.sim
|
./test.sh -f tsim/tmq/basic.sim
|
||||||
#./test.sh -f tsim/tmq/basic1.sim
|
./test.sh -f tsim/tmq/basic1.sim
|
||||||
|
|
||||||
#======================b1-end===============
|
#======================b1-end===============
|
||||||
|
|
|
@ -26,4 +26,4 @@ run tsim/show/basic.sim
|
||||||
run tsim/table/basic1.sim
|
run tsim/table/basic1.sim
|
||||||
|
|
||||||
run tsim/tmq/basic.sim
|
run tsim/tmq/basic.sim
|
||||||
#run tsim/tmq/basic1.sim
|
run tsim/tmq/basic1.sim
|
||||||
|
|
|
@ -135,42 +135,42 @@ print inserted totalMsgCnt: $totalMsgCnt
|
||||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
|
||||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
|
||||||
print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
if $system_content != @{consume success: 20}@ then
|
if $system_content != @{consume success: 20, 0}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
|
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
|
||||||
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
|
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
|
||||||
#print cmd result----> $system_content
|
#print cmd result----> $system_content
|
||||||
#if $system_content != @{consume success: 20}@ then
|
#if $system_content != @{consume success: 20, 0}@ then
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
|
||||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
|
||||||
print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
if $system_content != @{consume success: 10}@ then
|
if $system_content != @{consume success: 10, 0}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
||||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
|
||||||
print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
if $system_content != @{consume success: 10}@ then
|
if $system_content != @{consume success: 10, 0}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
||||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
|
||||||
print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
if $system_content != @{consume success: 20}@ then
|
if $system_content != @{consume success: 20, 0}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
||||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
|
||||||
print cmd result----> $system_content
|
print cmd result----> $system_content
|
||||||
if $system_content != @{consume success: 20}@ then
|
if $system_content != @{consume success: 20, 0}@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -219,33 +219,33 @@ tmq_list_t* build_topic_list() {
|
||||||
return topic_list;
|
return topic_list;
|
||||||
}
|
}
|
||||||
|
|
||||||
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
void loop_consume(tmq_t* tmq) {
|
||||||
tmq_resp_err_t err;
|
tmq_resp_err_t err;
|
||||||
|
|
||||||
if ((err = tmq_subscribe(tmq, topics))) {
|
|
||||||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t totalMsgs = 0;
|
int32_t totalMsgs = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
int32_t skipLogNum = 0;
|
int32_t skipLogNum = 0;
|
||||||
//int64_t startTime = taosGetTimestampUs();
|
|
||||||
while (running) {
|
while (running) {
|
||||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
|
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 1);
|
||||||
if (tmqmessage) {
|
if (tmqMsg) {
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
skipLogNum += tmqGetSkipLogNum(tmqmessage);
|
|
||||||
if (0 != g_stConfInfo.showMsgFlag) {
|
#if 0
|
||||||
msg_process(tmqmessage);
|
TAOS_ROW row;
|
||||||
|
while (NULL != (row = tmq_get_row(tmqMsg))) {
|
||||||
|
totalRows++;
|
||||||
}
|
}
|
||||||
tmq_message_destroy(tmqmessage);
|
#endif
|
||||||
|
|
||||||
|
skipLogNum += tmqGetSkipLogNum(tmqMsg);
|
||||||
|
if (0 != g_stConfInfo.showMsgFlag) {
|
||||||
|
msg_process(tmqMsg);
|
||||||
|
}
|
||||||
|
tmq_message_destroy(tmqMsg);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//int64_t endTime = taosGetTimestampUs();
|
|
||||||
//double consumeTime = (double)(endTime - startTime) / 1000000;
|
|
||||||
|
|
||||||
|
|
||||||
err = tmq_consumer_close(tmq);
|
err = tmq_consumer_close(tmq);
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -253,7 +253,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("{consume success: %d}", totalMsgs);
|
printf("{consume success: %d, %d}", totalMsgs, totalRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int32_t argc, char *argv[]) {
|
int main(int32_t argc, char *argv[]) {
|
||||||
|
@ -266,7 +266,21 @@ int main(int32_t argc, char *argv[]) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
perf_loop(tmq, topic_list);
|
tmq_resp_err_t err = tmq_subscribe(tmq, topic_list);
|
||||||
|
if (err) {
|
||||||
|
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
loop_consume(tmq);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
err = tmq_unsubscribe(tmq);
|
||||||
|
if (err) {
|
||||||
|
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue