test: add tmq test cases
This commit is contained in:
parent
925ac93e38
commit
18090bdf6f
|
@ -0,0 +1,267 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
# scene1: vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# scene2: vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# scene3: vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# scene4: vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
|
||||
######## This test case include scene1 and scene3
|
||||
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
$loop_cnt = 0
|
||||
check_dnode_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != ready then
|
||||
goto check_dnode_ready
|
||||
endi
|
||||
|
||||
sql connect
|
||||
|
||||
$loop_cnt = 0
|
||||
$vgroups = 1
|
||||
$dbNamme = d0
|
||||
loop_vgroups:
|
||||
print =============== create database $dbNamme vgroups $vgroups
|
||||
sql create database $dbNamme vgroups $vgroups
|
||||
sql show databases
|
||||
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
|
||||
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data02 != 1 then # vgroups
|
||||
print vgroups: $data02
|
||||
return -1
|
||||
endi
|
||||
else
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 == d1 then
|
||||
if $data02 != 4 then # vgroups
|
||||
print vgroups: $data02
|
||||
return -1
|
||||
endi
|
||||
else
|
||||
if $data12 != 4 then # vgroups
|
||||
print vgroups: $data12
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
endi
|
||||
|
||||
sql use $dbNamme
|
||||
|
||||
print =============== create super table
|
||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create child table
|
||||
$tbPrefix = ct
|
||||
$tbNum = 2
|
||||
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using stb tags( $i )
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
print =============== create normal table
|
||||
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
|
||||
|
||||
print =============== create multi topics. notes: now only support:
|
||||
print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb
|
||||
print =============== will support: * from stb
|
||||
|
||||
sql create topic topic_stb_column as select ts, c1, c3 from stb
|
||||
#sql create topic topic_stb_all as select * from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
|
||||
sql create topic topic_ctb_all as select * from ct0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
|
||||
|
||||
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
|
||||
sql create topic topic_ntb_all as select * from ntb
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb
|
||||
|
||||
sql show tables
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== run_back insert data
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
run_back tsim/tmq/insertDataV1.sim
|
||||
else
|
||||
run_back tsim/tmq/insertDataV4.sim
|
||||
endi
|
||||
|
||||
sleep 1000
|
||||
|
||||
#$rowNum = 1000
|
||||
#$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#
|
||||
#$i = 0
|
||||
#while $i < $tbNum
|
||||
# $tb = $tbPrefix . $i
|
||||
#
|
||||
# $x = 0
|
||||
# while $x < $rowNum
|
||||
# $c = $x / 10
|
||||
# $c = $c * 10
|
||||
# $c = $x - $c
|
||||
#
|
||||
# $binary = ' . binary
|
||||
# $binary = $binary . $c
|
||||
# $binary = $binary . '
|
||||
#
|
||||
# sql insert into $tb values ($tstart , $c , $x , $binary )
|
||||
# sql insert into ntb values ($tstart , $c , $x , $binary )
|
||||
# $tstart = $tstart + 1
|
||||
# $x = $x + 1
|
||||
# endw
|
||||
#
|
||||
# $i = $i + 1
|
||||
# $tstart = 1640966400000
|
||||
#endw
|
||||
|
||||
#root@trd02 /home $ tmq_sim --help
|
||||
# -c Configuration directory, default is
|
||||
# -d The name of the database for cosumer, no default
|
||||
# -t The topic string for cosumer, no default
|
||||
# -k The key-value string for cosumer, no default
|
||||
# -g showMsgFlag, default is 0
|
||||
#
|
||||
|
||||
$consumeDelay = 5000
|
||||
$expectConsumeMsgCnt = 1000
|
||||
print expectConsumeMsgCnt: $expectConsumeMsgCnt, consumeDelay: $consumeDelay
|
||||
|
||||
# supported key:
|
||||
# group.id:<xxx>
|
||||
# enable.auto.commit:<true | false>
|
||||
# auto.offset.reset:<earliest | latest | none>
|
||||
# td.connect.ip:<fqdn | ipaddress>
|
||||
# td.connect.user:root
|
||||
# td.connect.pass:taosdata
|
||||
# td.connect.port:6030
|
||||
# td.connect.db:db
|
||||
|
||||
$expect_result = @{consume success: @
|
||||
$expect_result = $expect_result . $expectConsumeMsgCnt
|
||||
$expect_result = $expect_result . @, @
|
||||
$expect_result = $expect_result . 0}
|
||||
print expect_result----> $expect_result
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
#print cmd result----> $system_content
|
||||
##if $system_content != @{consume success: 10000, 0}@ then
|
||||
#if $system_content != $expect_result then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
#if $system_content != @{consume success: 10000, 0}@ then
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
$expect_result = @{consume success: @
|
||||
$expect_result = $expect_result . $rowNum
|
||||
$expect_result = $expect_result . @, @
|
||||
$expect_result = $expect_result . 0}
|
||||
print expect_result----> $expect_result
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
$expect_result = @{consume success: @
|
||||
$expect_result = $expect_result . $totalMsgCnt
|
||||
$expect_result = $expect_result . @, @
|
||||
$expect_result = $expect_result . 0}
|
||||
print expect_result----> $expect_result
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
$loop_cnt = 1
|
||||
$vgroups = 4
|
||||
$dbNamme = d1
|
||||
goto loop_vgroups
|
||||
endi
|
||||
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
sql connect
|
||||
|
||||
print ================ insert data
|
||||
$dbNamme = d0
|
||||
$tbPrefix = ct
|
||||
$tbNum = 2
|
||||
$rowNum = 100
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
|
||||
sql use $dbNamme
|
||||
|
||||
loop_insert:
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$c = $x / 10
|
||||
$c = $c * 10
|
||||
$c = $x - $c
|
||||
|
||||
$binary = ' . binary
|
||||
$binary = $binary . $c
|
||||
$binary = $binary . '
|
||||
|
||||
#print ====> insert into $tb values ($tstart , $c , $x , $binary )
|
||||
#print ====> insert into ntb values ($tstart , $c , $x , $binary )
|
||||
sql insert into $tb values ($tstart , $c , $x , $binary )
|
||||
sql insert into ntb values ($tstart , $c , $x , $binary )
|
||||
$tstart = $tstart + 1
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
$tstart = 1640966400000
|
||||
endw
|
||||
goto loop_insert
|
||||
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
sql connect
|
||||
|
||||
print ================ insert data
|
||||
$dbNamme = d1
|
||||
$tbPrefix = ct
|
||||
$tbNum = 2
|
||||
$rowNum = 100
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
|
||||
sql use $dbNamme
|
||||
|
||||
loop_insert:
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$c = $x / 10
|
||||
$c = $c * 10
|
||||
$c = $x - $c
|
||||
|
||||
$binary = ' . binary
|
||||
$binary = $binary . $c
|
||||
$binary = $binary . '
|
||||
|
||||
#print ====> insert into $tb values ($tstart , $c , $x , $binary )
|
||||
#print ====> insert into ntb values ($tstart , $c , $x , $binary )
|
||||
sql insert into $tb values ($tstart , $c , $x , $binary )
|
||||
sql insert into ntb values ($tstart , $c , $x , $binary )
|
||||
$tstart = $tstart + 1
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
$tstart = 1640966400000
|
||||
endw
|
||||
goto loop_insert
|
||||
|
||||
|
|
@ -42,6 +42,8 @@ typedef struct {
|
|||
char topicString[256];
|
||||
char keyString[1024];
|
||||
int32_t showMsgFlag;
|
||||
int32_t consumeDelay; // unit s
|
||||
int32_t consumeMsgCnt;
|
||||
|
||||
// save result after parse agrvs
|
||||
int32_t numOfTopic;
|
||||
|
@ -71,12 +73,19 @@ static void printHelp() {
|
|||
printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-g");
|
||||
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
||||
printf("%s%s\n", indent, "-y");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
||||
printf("%s%s\n", indent, "-m");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
void parseArgument(int32_t argc, char *argv[]) {
|
||||
|
||||
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
|
||||
g_stConfInfo.showMsgFlag = 0;
|
||||
g_stConfInfo.consumeDelay = 8000;
|
||||
g_stConfInfo.consumeMsgCnt = 0;
|
||||
|
||||
for (int32_t i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||
|
@ -92,6 +101,10 @@ void parseArgument(int32_t argc, char *argv[]) {
|
|||
strcpy(g_stConfInfo.keyString, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-g") == 0) {
|
||||
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-y") == 0) {
|
||||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0) {
|
||||
g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
|
||||
} else {
|
||||
printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
||||
exit(-1);
|
||||
|
@ -256,6 +269,48 @@ void loop_consume(tmq_t* tmq) {
|
|||
printf("{consume success: %d, %d}", totalMsgs, totalRows);
|
||||
}
|
||||
|
||||
|
||||
void parallel_consume(tmq_t* tmq) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
while (running) {
|
||||
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
|
||||
if (tmqMsg) {
|
||||
totalMsgs++;
|
||||
|
||||
#if 0
|
||||
TAOS_ROW row;
|
||||
while (NULL != (row = tmq_get_row(tmqMsg))) {
|
||||
totalRows++;
|
||||
}
|
||||
#endif
|
||||
|
||||
skipLogNum += tmqGetSkipLogNum(tmqMsg);
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
msg_process(tmqMsg);
|
||||
}
|
||||
tmq_message_destroy(tmqMsg);
|
||||
|
||||
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err) {
|
||||
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
printf("%d", totalMsgs); // output to sim for check result
|
||||
}
|
||||
|
||||
int main(int32_t argc, char *argv[]) {
|
||||
parseArgument(argc, argv);
|
||||
parseInputString();
|
||||
|
@ -271,8 +326,12 @@ int main(int32_t argc, char *argv[]) {
|
|||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
loop_consume(tmq);
|
||||
|
||||
if (0 == g_stConfInfo.consumeMsgCnt) {
|
||||
loop_consume(tmq);
|
||||
} else {
|
||||
parallel_consume(tmq);
|
||||
}
|
||||
|
||||
err = tmq_unsubscribe(tmq);
|
||||
if (err) {
|
||||
|
|
Loading…
Reference in New Issue