From 7b3a8834981eeb64d6abf56ff74f071fdfc72466 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 13 Apr 2022 18:23:05 +0800 Subject: [PATCH 1/3] test: add tmq test cases --- tests/script/tsim/tmq/insertDataV1.sim | 16 +- tests/script/tsim/tmq/insertDataV4.sim | 14 +- tests/script/tsim/tmq/insertFixedDataV2.sim | 51 ++++ tests/script/tsim/tmq/insertFixedDataV4.sim | 51 ++++ .../tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim | 265 +++++++++++++++++ .../tsim/tmq/main2Con1Cgrp1TopicFrStb.sim | 270 ++++++++++++++++++ .../tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim | 265 +++++++++++++++++ .../tsim/tmq/main2Con1Cgrp2TopicFrStb.sim | 270 ++++++++++++++++++ .../tsim/tmq/mainConsumerInMultiTopic.sim | 52 ++-- .../tsim/tmq/mainConsumerInOneTopic.sim | 76 ++--- tests/test/c/tmqSim.c | 175 +++++++++++- 11 files changed, 1424 insertions(+), 81 deletions(-) create mode 100644 tests/script/tsim/tmq/insertFixedDataV2.sim create mode 100644 tests/script/tsim/tmq/insertFixedDataV4.sim create mode 100644 tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim create mode 100644 tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim create mode 100644 tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim create mode 100644 tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim diff --git a/tests/script/tsim/tmq/insertDataV1.sim b/tests/script/tsim/tmq/insertDataV1.sim index a349c55dbd..0df74d53f8 100644 --- a/tests/script/tsim/tmq/insertDataV1.sim +++ b/tests/script/tsim/tmq/insertDataV1.sim @@ -22,23 +22,19 @@ while $i < $tbNum $x = 0 while $x < $rowNum - $c = $x / 10 - $c = $c * 10 - $c = $x - $c - $binary = ' . binary - $binary = $binary . $c + $binary = $binary . $i $binary = $binary . ' - #print ====> insert into $tb values ($tstart , $c , $x , $binary ) - #print ====> insert into ntb values ($tstart , $c , $x , $binary ) - sql insert into $tb values ($tstart , $c , $x , $binary ) - sql insert into ntb values ($tstart , $c , $x , $binary ) + #print ====> insert into $tb values ($tstart , $i , $x , $binary ) + #print ====> insert into ntb values ($tstart , $i , $x , $binary ) + sql insert into $tb values ($tstart , $i , $x , $binary ) + sql insert into ntb values ($tstart , 999 , 999 , 'binary-ntb' ) $tstart = $tstart + 1 $x = $x + 1 endw - #print ====> insert rows: $rowNum into $tb and ntb + print ====> insert rows: $rowNum into $tb and ntb $i = $i + 1 # $tstart = 1640966400000 diff --git a/tests/script/tsim/tmq/insertDataV4.sim b/tests/script/tsim/tmq/insertDataV4.sim index 72c1358e7f..dbd52f56b8 100644 --- a/tests/script/tsim/tmq/insertDataV4.sim +++ b/tests/script/tsim/tmq/insertDataV4.sim @@ -22,18 +22,14 @@ while $i < $tbNum $x = 0 while $x < $rowNum - $c = $x / 10 - $c = $c * 10 - $c = $x - $c - $binary = ' . binary - $binary = $binary . $c + $binary = $binary . $i $binary = $binary . ' - #print ====> insert into $tb values ($tstart , $c , $x , $binary ) - #print ====> insert into ntb values ($tstart , $c , $x , $binary ) - sql insert into $tb values ($tstart , $c , $x , $binary ) - sql insert into ntb values ($tstart , $c , $x , $binary ) + #print ====> insert into $tb values ($tstart , $i , $x , $binary ) + #print ====> insert into ntb values ($tstart , $i , $x , $binary ) + sql insert into $tb values ($tstart , $i , $x , $binary ) + sql insert into ntb values ($tstart , 999 , 999 , 'binary-ntb' ) $tstart = $tstart + 1 $x = $x + 1 endw diff --git a/tests/script/tsim/tmq/insertFixedDataV2.sim b/tests/script/tsim/tmq/insertFixedDataV2.sim new file mode 100644 index 0000000000..a93be3e5a6 --- /dev/null +++ b/tests/script/tsim/tmq/insertFixedDataV2.sim @@ -0,0 +1,51 @@ + +sql connect + +print ================ insert data +$dbNamme = d0 +$tbPrefix = ct +$tbNum = 10 +$rowNum = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$loopInsertNum = 10 + +sql use $dbNamme + +$loopIndex = 0 + +loop_insert: +print ====> loop $loopIndex insert +$loopIndex = $loopIndex + 1 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $binary = ' . binary + $binary = $binary . $i + $binary = $binary . ' + + #print ====> insert into $tb values ($tstart , $i , $x , $binary ) + #print ====> insert into ntb values ($tstart , $i , $x , $binary ) + sql insert into $tb values ($tstart , $i , $x , $binary ) + sql insert into ntb values ($tstart , 999 , 999 , 'binary-ntb' ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + #print ====> insert rows: $rowNum into $tb and ntb + + $i = $i + 1 +# $tstart = 1640966400000 +endw + + +if $loopIndex < $loopInsertNum then + goto loop_insert +endi + +print ====> insert data end =========== + diff --git a/tests/script/tsim/tmq/insertFixedDataV4.sim b/tests/script/tsim/tmq/insertFixedDataV4.sim new file mode 100644 index 0000000000..9f7f86747f --- /dev/null +++ b/tests/script/tsim/tmq/insertFixedDataV4.sim @@ -0,0 +1,51 @@ + +sql connect + +print ================ insert data +$dbNamme = d1 +$tbPrefix = ct +$tbNum = 10 +$rowNum = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +$loopInsertNum = 10 + +sql use $dbNamme + +$loopIndex = 0 + +loop_insert: +print ====> loop $loopIndex insert +$loopIndex = $loopIndex + 1 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $binary = ' . binary + $binary = $binary . $i + $binary = $binary . ' + + #print ====> insert into $tb values ($tstart , $i , $x , $binary ) + #print ====> insert into ntb values ($tstart , $i , $x , $binary ) + sql insert into $tb values ($tstart , $i , $x , $binary ) + sql insert into ntb values ($tstart , 999 , 999 , 'binary-ntb' ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + #print ====> insert rows: $rowNum into $tb and ntb + + $i = $i + 1 +# $tstart = 1640966400000 +endw + + +if $loopIndex < $loopInsertNum then + goto loop_insert +endi + +print ====> insert data end =========== + diff --git a/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim new file mode 100644 index 0000000000..c301219ad6 --- /dev/null +++ b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim @@ -0,0 +1,265 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +# scene1: vgroups=2, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene2: vgroups=2, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene3: vgroups=4, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene4: vgroups=4, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## +######## This test case include scene1 and scene3 +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$loop_cnt = 0 +$vgroups = 2 +$dbNamme = d0 +loop_vgroups: +print =============== create database $dbNamme vgroups $vgroups +sql create database $dbNamme vgroups $vgroups +sql show databases +print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 +print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19 +print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 + +if $loop_cnt == 0 then + if $rows != 2 then + return -1 + endi + if $data02 != 2 then # vgroups + print vgroups: $data02 + return -1 + endi +else + if $rows != 3 then + return -1 + endi + if $data00 == d1 then + if $data02 != 4 then # vgroups + print vgroups: $data02 + return -1 + endi + else + if $data12 != 4 then # vgroups + print vgroups: $data12 + return -1 + endi + endi +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +$tbPrefix = ct +$tbNum = 100 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using stb tags( $i ) + $i = $i + 1 +endw + +print =============== create normal table +sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +print =============== create topics from child table + +sql create topic topic_ctb_column as select ts, c1, c3 from ct0 +sql create topic topic_ctb_all as select * from ct0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0 + +#sql create topic topic_ntb_column as select ts, c1, c3 from ntb +#sql create topic topic_ntb_all as select * from ntb +#sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb + +sql show tables +if $rows != 101 then + return -1 +endi + +print =============== run_back insert data + +if $loop_cnt == 0 then + run_back tsim/tmq/insertFixedDataV2.sim +else + run_back tsim/tmq/insertFixedDataV4.sim +endi + +#sleep 1000 + +#$rowNum = 1000 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +# +#$i = 0 +#while $i < $tbNum +# $tb = $tbPrefix . $i +# +# $x = 0 +# while $x < $rowNum +# $c = $x / 10 +# $c = $c * 10 +# $c = $x - $c +# +# $binary = ' . binary +# $binary = $binary . $c +# $binary = $binary . ' +# +# sql insert into $tb values ($tstart , $c , $x , $binary ) +# sql insert into ntb values ($tstart , $c , $x , $binary ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw +# +# $i = $i + 1 +# $tstart = 1640966400000 +#endw + +#root@trd02 /home $ tmq_sim --help +# -c Configuration directory, default is +# -d The name of the database for cosumer, no default +# -t The topic string for cosumer, no default +# -k The key-value string for cosumer, no default +# -g showMsgFlag, default is 0 +# + +$tbNum = 10 +$consumeDelay = 10 +$expectMsgCntFromCtb = 300 +$expectMsgCntFromStb = $expectMsgCntFromCtb * $tbNum +print consumeDelay: $consumeDelay +print insert data child num: $tbNum +print expectMsgCntFromCtb: $expectMsgCntFromCtb +print expectMsgCntFromStb: $expectMsgCntFromStb + +# supported key: +# group.id: +# enable.auto.commit: +# auto.offset.reset: +# td.connect.ip: +# td.connect.user:root +# td.connect.pass:taosdata +# td.connect.port:6030 +# td.connect.db:db + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $rowNum +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 0 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 0 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 0 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 0 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 0 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 0 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $totalMsgCnt +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $expectConsumeMsgCnt +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +##print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +##system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +##print cmd result----> $system_content +###if $system_content != @{consume success: 10000, 0}@ then +##if $system_content != success then +## return -1 +##endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#print cmd result----> $system_content +##if $system_content != @{consume success: 10000, 0}@ then +#if $system_content != success then +# return -1 +#endi + +if $loop_cnt == 0 then + $loop_cnt = 1 + $vgroups = 4 + $dbNamme = d1 + goto loop_vgroups +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim new file mode 100644 index 0000000000..0f38a8b58b --- /dev/null +++ b/tests/script/tsim/tmq/main2Con1Cgrp1TopicFrStb.sim @@ -0,0 +1,270 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +# scene1: vgroups=2, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene2: vgroups=2, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene3: vgroups=4, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene4: vgroups=4, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## +######## This test case include scene1 and scene3 +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$loop_cnt = 0 +$vgroups = 2 +$dbNamme = d0 +loop_vgroups: +print =============== create database $dbNamme vgroups $vgroups +sql create database $dbNamme vgroups $vgroups +sql show databases +print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 +print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19 +print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 + +if $loop_cnt == 0 then + if $rows != 2 then + return -1 + endi + if $data02 != 2 then # vgroups + print vgroups: $data02 + return -1 + endi +else + if $rows != 3 then + return -1 + endi + if $data00 == d1 then + if $data02 != 4 then # vgroups + print vgroups: $data02 + return -1 + endi + else + if $data12 != 4 then # vgroups + print vgroups: $data12 + return -1 + endi + endi +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +$tbPrefix = ct +$tbNum = 100 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using stb tags( $i ) + $i = $i + 1 +endw + +print =============== create normal table +sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +print =============== create multi topics. notes: now only support: +print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb +print =============== will support: * from stb + +sql create topic topic_stb_column as select ts, c1, c3 from stb +#sql create topic topic_stb_all as select * from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +sql create topic topic_ctb_column as select ts, c1, c3 from ct0 +sql create topic topic_ctb_all as select * from ct0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0 + +sql create topic topic_ntb_column as select ts, c1, c3 from ntb +sql create topic topic_ntb_all as select * from ntb +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb + +sql show tables +if $rows != 101 then + return -1 +endi + +print =============== run_back insert data + +if $loop_cnt == 0 then + run_back tsim/tmq/insertFixedDataV2.sim +else + run_back tsim/tmq/insertFixedDataV4.sim +endi + +#sleep 1000 + +#$rowNum = 1000 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +# +#$i = 0 +#while $i < $tbNum +# $tb = $tbPrefix . $i +# +# $x = 0 +# while $x < $rowNum +# $c = $x / 10 +# $c = $c * 10 +# $c = $x - $c +# +# $binary = ' . binary +# $binary = $binary . $c +# $binary = $binary . ' +# +# sql insert into $tb values ($tstart , $c , $x , $binary ) +# sql insert into ntb values ($tstart , $c , $x , $binary ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw +# +# $i = $i + 1 +# $tstart = 1640966400000 +#endw + +#root@trd02 /home $ tmq_sim --help +# -c Configuration directory, default is +# -d The name of the database for cosumer, no default +# -t The topic string for cosumer, no default +# -k The key-value string for cosumer, no default +# -g showMsgFlag, default is 0 +# + +$tbNum = 10 +$consumeDelay = 10 +$expectMsgCntFromCtb = 300 +$expectMsgCntFromStb = $expectMsgCntFromCtb * $tbNum +print consumeDelay: $consumeDelay +print insert data child num: $tbNum +print expectMsgCntFromCtb: $expectMsgCntFromCtb +print expectMsgCntFromStb: $expectMsgCntFromStb + +# supported key: +# group.id: +# enable.auto.commit: +# auto.offset.reset: +# td.connect.ip: +# td.connect.user:root +# td.connect.pass:taosdata +# td.connect.port:6030 +# td.connect.db:db + +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $rowNum +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $totalMsgCnt +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#print cmd result----> $system_content +##if $system_content != @{consume success: 10000, 0}@ then +#if $system_content != success then +# return -1 +#endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +print cmd result----> $system_content +#if $system_content != @{consume success: 10000, 0}@ then +if $system_content != success then + return -1 +endi +if $loop_cnt == 0 then + $loop_cnt = 1 + $vgroups = 4 + $dbNamme = d1 + goto loop_vgroups +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim new file mode 100644 index 0000000000..99d1c6e442 --- /dev/null +++ b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim @@ -0,0 +1,265 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +# scene1: vgroups=2, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene2: vgroups=2, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene3: vgroups=4, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene4: vgroups=4, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## +######## This test case include scene1 and scene3 +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$loop_cnt = 0 +$vgroups = 2 +$dbNamme = d0 +loop_vgroups: +print =============== create database $dbNamme vgroups $vgroups +sql create database $dbNamme vgroups $vgroups +sql show databases +print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 +print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19 +print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 + +if $loop_cnt == 0 then + if $rows != 2 then + return -1 + endi + if $data02 != 2 then # vgroups + print vgroups: $data02 + return -1 + endi +else + if $rows != 3 then + return -1 + endi + if $data00 == d1 then + if $data02 != 4 then # vgroups + print vgroups: $data02 + return -1 + endi + else + if $data12 != 4 then # vgroups + print vgroups: $data12 + return -1 + endi + endi +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +$tbPrefix = ct +$tbNum = 100 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using stb tags( $i ) + $i = $i + 1 +endw + +print =============== create normal table +sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +print =============== create topics from child table + +sql create topic topic_ctb_column as select ts, c1, c3 from ct0 +sql create topic topic_ctb_all as select * from ct0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0 + +#sql create topic topic_ntb_column as select ts, c1, c3 from ntb +#sql create topic topic_ntb_all as select * from ntb +#sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb + +sql show tables +if $rows != 101 then + return -1 +endi + +print =============== run_back insert data + +if $loop_cnt == 0 then + run_back tsim/tmq/insertFixedDataV2.sim +else + run_back tsim/tmq/insertFixedDataV4.sim +endi + +#sleep 1000 + +#$rowNum = 1000 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +# +#$i = 0 +#while $i < $tbNum +# $tb = $tbPrefix . $i +# +# $x = 0 +# while $x < $rowNum +# $c = $x / 10 +# $c = $c * 10 +# $c = $x - $c +# +# $binary = ' . binary +# $binary = $binary . $c +# $binary = $binary . ' +# +# sql insert into $tb values ($tstart , $c , $x , $binary ) +# sql insert into ntb values ($tstart , $c , $x , $binary ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw +# +# $i = $i + 1 +# $tstart = 1640966400000 +#endw + +#root@trd02 /home $ tmq_sim --help +# -c Configuration directory, default is +# -d The name of the database for cosumer, no default +# -t The topic string for cosumer, no default +# -k The key-value string for cosumer, no default +# -g showMsgFlag, default is 0 +# + +$tbNum = 10 +$consumeDelay = 10 +$expectMsgCntFromCtb = 300 +$expectMsgCntFromStb = $expectMsgCntFromCtb * $tbNum +print consumeDelay: $consumeDelay +print insert data child num: $tbNum +print expectMsgCntFromCtb: $expectMsgCntFromCtb +print expectMsgCntFromStb: $expectMsgCntFromStb + +# supported key: +# group.id: +# enable.auto.commit: +# auto.offset.reset: +# td.connect.ip: +# td.connect.user:root +# td.connect.pass:taosdata +# td.connect.port:6030 +# td.connect.db:db + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $rowNum +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $totalMsgCnt +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 1 +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $expectConsumeMsgCnt +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +##print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +##system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +##print cmd result----> $system_content +###if $system_content != @{consume success: 10000, 0}@ then +##if $system_content != success then +## return -1 +##endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#print cmd result----> $system_content +##if $system_content != @{consume success: 10000, 0}@ then +#if $system_content != success then +# return -1 +#endi + +if $loop_cnt == 0 then + $loop_cnt = 1 + $vgroups = 4 + $dbNamme = d1 + goto loop_vgroups +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim new file mode 100644 index 0000000000..76ad75aead --- /dev/null +++ b/tests/script/tsim/tmq/main2Con1Cgrp2TopicFrStb.sim @@ -0,0 +1,270 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +# scene1: vgroups=2, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene2: vgroups=2, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene3: vgroups=4, one topic for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene4: vgroups=4, multi topics for two consumers, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## +######## This test case include scene1 and scene3 +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$loop_cnt = 0 +$vgroups = 2 +$dbNamme = d0 +loop_vgroups: +print =============== create database $dbNamme vgroups $vgroups +sql create database $dbNamme vgroups $vgroups +sql show databases +print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 +print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19 +print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 + +if $loop_cnt == 0 then + if $rows != 2 then + return -1 + endi + if $data02 != 2 then # vgroups + print vgroups: $data02 + return -1 + endi +else + if $rows != 3 then + return -1 + endi + if $data00 == d1 then + if $data02 != 4 then # vgroups + print vgroups: $data02 + return -1 + endi + else + if $data12 != 4 then # vgroups + print vgroups: $data12 + return -1 + endi + endi +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +$tbPrefix = ct +$tbNum = 100 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using stb tags( $i ) + $i = $i + 1 +endw + +print =============== create normal table +sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +print =============== create multi topics. notes: now only support: +print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb +print =============== will support: * from stb + +sql create topic topic_stb_column as select ts, c1, c3 from stb +#sql create topic topic_stb_all as select * from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +sql create topic topic_ctb_column as select ts, c1, c3 from ct0 +sql create topic topic_ctb_all as select * from ct0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0 + +sql create topic topic_ntb_column as select ts, c1, c3 from ntb +sql create topic topic_ntb_all as select * from ntb +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb + +sql show tables +if $rows != 101 then + return -1 +endi + +print =============== run_back insert data + +if $loop_cnt == 0 then + run_back tsim/tmq/insertFixedDataV2.sim +else + run_back tsim/tmq/insertFixedDataV4.sim +endi + +#sleep 1000 + +#$rowNum = 1000 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +# +#$i = 0 +#while $i < $tbNum +# $tb = $tbPrefix . $i +# +# $x = 0 +# while $x < $rowNum +# $c = $x / 10 +# $c = $c * 10 +# $c = $x - $c +# +# $binary = ' . binary +# $binary = $binary . $c +# $binary = $binary . ' +# +# sql insert into $tb values ($tstart , $c , $x , $binary ) +# sql insert into ntb values ($tstart , $c , $x , $binary ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw +# +# $i = $i + 1 +# $tstart = 1640966400000 +#endw + +#root@trd02 /home $ tmq_sim --help +# -c Configuration directory, default is +# -d The name of the database for cosumer, no default +# -t The topic string for cosumer, no default +# -k The key-value string for cosumer, no default +# -g showMsgFlag, default is 0 +# + +$tbNum = 10 +$consumeDelay = 10 +$expectMsgCntFromCtb = 300 +$expectMsgCntFromStb = $expectMsgCntFromCtb * $tbNum +print consumeDelay: $consumeDelay +print insert data child num: $tbNum +print expectMsgCntFromCtb: $expectMsgCntFromCtb +print expectMsgCntFromStb: $expectMsgCntFromStb + +# supported key: +# group.id: +# enable.auto.commit: +# auto.offset.reset: +# td.connect.ip: +# td.connect.user:root +# td.connect.pass:taosdata +# td.connect.port:6030 +# td.connect.db:db + +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $rowNum +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_column" -k1 "group.id:tg2" -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_all" -k1 "group.id:tg2" -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ctb_function" -k1 "group.id:tg2" -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#$expect_result = @{consume success: @ +#$expect_result = $expect_result . $totalMsgCnt +#$expect_result = $expect_result . @, @ +#$expect_result = $expect_result . 0} +#print expect_result----> $expect_result +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_column" -k1 "group.id:tg2" -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_all" -k1 "group.id:tg2" -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi +# +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_ntb_function" -k1 "group.id:tg2" -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +#print cmd result----> $system_content +#if $system_content != success then +# return -1 +#endi + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $expectMsgCntFromStb +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 1 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 1 +print cmd result----> $system_content +if $system_content != success then + return -1 +endi + +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 1 +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_all" -k1 "group.id:tg2" -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 1 +#print cmd result----> $system_content +##if $system_content != @{consume success: 10000, 0}@ then +#if $system_content != success then +# return -1 +#endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 1 +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column" -k1 "group.id:tg2" -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 1 +print cmd result----> $system_content +#if $system_content != @{consume success: 10000, 0}@ then +if $system_content != success then + return -1 +endi +if $loop_cnt == 0 then + $loop_cnt = 1 + $vgroups = 4 + $dbNamme = d1 + goto loop_vgroups +endi + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim b/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim index 0df7a8ba57..095809abe9 100644 --- a/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim +++ b/tests/script/tsim/tmq/mainConsumerInMultiTopic.sim @@ -120,9 +120,9 @@ endi print =============== run_back insert data if $loop_cnt == 0 then - run_back tsim/tmq/insertDataV1.sim + run_back tsim/tmq/insertFixedDataV2.sim else - run_back tsim/tmq/insertDataV4.sim + run_back tsim/tmq/insertFixedDataV4.sim endi #sleep 1000 @@ -162,9 +162,16 @@ endi # -g showMsgFlag, default is 0 # -$consumeDelay = 50 -$consumeMsgCntFromTopic = 1000 -print consumeMsgCntFromTopic: $consumeMsgCntFromTopic , consumeDelay: $consumeDelay +$tbNum = 10 +$consumeDelay = 5 +$expectMsgCntFromCtb = 1000 +$expectMsgCntFromNtb = 1000 +$expectMsgCntFromStb = $expectMsgCntFromCtb * $tbNum +print consumeDelay: $consumeDelay +print insert data child num: $tbNum +print expectMsgCntFromCtb: $expectMsgCntFromCtb +print expectMsgCntFromStb: $expectMsgCntFromStb + # supported key: # group.id: @@ -177,49 +184,50 @@ print consumeMsgCntFromTopic: $consumeMsgCntFromTopic , consumeDelay: $consumeDe # td.connect.db:db $numOfTopics = 2 -$expectConsumeMsgCnt = $consumeMsgCntFromTopic * $numOfTopics +$expectMsgCntFromStb = $expectMsgCntFromStb * $numOfTopics $expect_result = @{consume success: @ -$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . $expectMsgCntFromStb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column, topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb print cmd result----> $system_content #if $system_content != @{consume success: 20000, 0}@ then -if $system_content < $expect_result then +if $system_content != $expect_result then return -1 endi $numOfTopics = 3 -$expectConsumeMsgCnt = $consumeMsgCntFromTopic * $numOfTopics +$expectMsgCntFromCtb = $expectMsgCntFromCtb * $numOfTopics $expect_result = @{consume success: @ -$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . $expectMsgCntFromCtb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content #if $system_content != @{consume success: 300, 0}@ then -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi $numOfTopics = 3 -$expectConsumeMsgCnt = $consumeMsgCntFromTopic * $numOfTopics +$expectMsgCntFromNtb = $expectMsgCntFromNtb * $tbNum +$expectMsgCntFromNtb = $expectMsgCntFromNtb * $numOfTopics $expect_result = @{consume success: @ -$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . $expectMsgCntFromNtb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_all, topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_all, topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_all, topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromNtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_all, topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromNtb print cmd result----> $system_content #if $system_content != @{consume success: 30000, 0}@ then -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi diff --git a/tests/script/tsim/tmq/mainConsumerInOneTopic.sim b/tests/script/tsim/tmq/mainConsumerInOneTopic.sim index b9a867921e..fc20e2db23 100644 --- a/tests/script/tsim/tmq/mainConsumerInOneTopic.sim +++ b/tests/script/tsim/tmq/mainConsumerInOneTopic.sim @@ -120,9 +120,9 @@ endi print =============== run_back insert data if $loop_cnt == 0 then - run_back tsim/tmq/insertDataV1.sim + run_back tsim/tmq/insertFixedDataV2.sim else - run_back tsim/tmq/insertDataV4.sim + run_back tsim/tmq/insertFixedDataV4.sim endi #sleep 1000 @@ -162,9 +162,15 @@ endi # -g showMsgFlag, default is 0 # -$consumeDelay = 50 -$expectConsumeMsgCnt = 1000 -print expectConsumeMsgCnt: $expectConsumeMsgCnt , consumeDelay: $consumeDelay +$tbNum = 10 +$consumeDelay = 5 +$expectMsgCntFromCtb = 1000 +$expectMsgCntFromStb = $expectMsgCntFromCtb * $tbNum +print consumeDelay: $consumeDelay +print insert data child num: $tbNum +print expectMsgCntFromCtb: $expectMsgCntFromCtb +print expectMsgCntFromStb: $expectMsgCntFromStb + # supported key: # group.id: @@ -177,82 +183,82 @@ print expectConsumeMsgCnt: $expectConsumeMsgCnt , consumeDelay: $consumeDelay # td.connect.db:db $expect_result = @{consume success: @ -$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . $expectMsgCntFromStb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi -#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb #print cmd result----> $system_content ##if $system_content != @{consume success: 10000, 0}@ then -#if $system_content < $expectConsumeMsgCnt then +#if $system_content != $expect_result then # return -1 #endi -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb print cmd result----> $system_content #if $system_content != @{consume success: 10000, 0}@ then -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi $expect_result = @{consume success: @ -$expect_result = $expect_result . $rowNum +$expect_result = $expect_result . $expectMsgCntFromCtb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi $expect_result = @{consume success: @ -$expect_result = $expect_result . $totalMsgCnt +$expect_result = $expect_result . $expectMsgCntFromStb $expect_result = $expect_result . @, @ $expect_result = $expect_result . 0} print expect_result----> $expect_result -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi -print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt -system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb print cmd result----> $system_content -if $system_content < $expectConsumeMsgCnt then +if $system_content != $expect_result then return -1 endi diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 702d88f258..86d0bef1a9 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -34,14 +34,23 @@ #define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_ROW_STR_LEN (16 * 1024) +typedef struct { + int32_t expectMsgCnt; + int32_t consumeMsgCnt; + TdThread thread; +} SThreadInfo; + typedef struct { // input from argvs char dbName[32]; char topicString[256]; char keyString[1024]; + char topicString1[256]; + char keyString1[1024]; int32_t showMsgFlag; int32_t consumeDelay; // unit s int32_t consumeMsgCnt; + int32_t checkMode; // save result after parse agrvs int32_t numOfTopic; @@ -50,6 +59,13 @@ typedef struct { int32_t numOfKey; char key[32][64]; char value[32][64]; + + int32_t numOfTopic1; + char topics1[32][64]; + + int32_t numOfKey1; + char key1[32][64]; + char value1[32][64]; } SConfInfo; static SConfInfo g_stConfInfo; @@ -69,12 +85,18 @@ static void printHelp() { printf("%s%s%s\n", indent, indent, "The topic string for cosumer, no default "); printf("%s%s\n", indent, "-k"); printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default "); + printf("%s%s\n", indent, "-t1"); + printf("%s%s%s\n", indent, indent, "The topic1 string for cosumer, no default "); + printf("%s%s\n", indent, "-k1"); + printf("%s%s%s\n", indent, indent, "The key1-value1 string for cosumer, no default "); printf("%s%s\n", indent, "-g"); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s\n", indent, "-y"); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); printf("%s%s\n", indent, "-m"); printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt); + printf("%s%s\n", indent, "-j"); + printf("%s%s%s%d\n", indent, indent, "check mode, default is s", g_stConfInfo.checkMode); exit(EXIT_SUCCESS); } @@ -96,12 +118,18 @@ void parseArgument(int32_t argc, char* argv[]) { strcpy(g_stConfInfo.topicString, argv[++i]); } else if (strcmp(argv[i], "-k") == 0) { strcpy(g_stConfInfo.keyString, argv[++i]); + } else if (strcmp(argv[i], "-t1") == 0) { + strcpy(g_stConfInfo.topicString1, argv[++i]); + } else if (strcmp(argv[i], "-k1") == 0) { + strcpy(g_stConfInfo.keyString1, argv[++i]); } else if (strcmp(argv[i], "-g") == 0) { g_stConfInfo.showMsgFlag = atol(argv[++i]); } else if (strcmp(argv[i], "-y") == 0) { g_stConfInfo.consumeDelay = atol(argv[++i]); } else if (strcmp(argv[i], "-m") == 0) { g_stConfInfo.consumeMsgCnt = atol(argv[++i]); + } else if (strcmp(argv[i], "-j") == 0) { + g_stConfInfo.checkMode = atol(argv[++i]); } else { printf("%s unknow para: %s %s", GREEN, argv[++i], NC); exit(-1); @@ -154,7 +182,18 @@ void parseInputString() { ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); g_stConfInfo.numOfTopic++; + + token = strtok(NULL, delim); + } + token = strtok(g_stConfInfo.topicString1, delim); + while(token != NULL) { + //printf("%s\n", token ); + strcpy(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1], token); + ltrim(g_stConfInfo.topics1[g_stConfInfo.numOfTopic1]); + //printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + g_stConfInfo.numOfTopic1++; + token = strtok(NULL, delim); } @@ -171,11 +210,28 @@ void parseInputString() { // g_stConfInfo.value[g_stConfInfo.numOfKey]); g_stConfInfo.numOfKey++; } + + token = strtok(NULL, delim); + } + token = strtok(g_stConfInfo.keyString1, delim); + while(token != NULL) { + //printf("%s\n", token ); + { + char* pstr = token; + ltrim(pstr); + char *ret = strchr(pstr, ch); + memcpy(g_stConfInfo.key1[g_stConfInfo.numOfKey1], pstr, ret-pstr); + strcpy(g_stConfInfo.value1[g_stConfInfo.numOfKey1], ret+1); + //printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]); + g_stConfInfo.numOfKey1++; + } + token = strtok(NULL, delim); } } + static int running = 1; /*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ @@ -225,6 +281,40 @@ tmq_list_t* build_topic_list() { return topic_list; } + +tmq_t* build_consumer_x() { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + exit(-1); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + //tmq_conf_set(conf, "group.id", "tg2"); + for (int32_t i = 0; i < g_stConfInfo.numOfKey1; i++) { + tmq_conf_set(conf, g_stConfInfo.key1[i], g_stConfInfo.value1[i]); + } + tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + return tmq; +} + +tmq_list_t* build_topic_list_x() { + tmq_list_t* topic_list = tmq_list_new(); + //tmq_list_append(topic_list, "test_stb_topic_1"); + for (int32_t i = 0; i < g_stConfInfo.numOfTopic1; i++) { + tmq_list_append(topic_list, g_stConfInfo.topics1[i]); + } + return topic_list; +} + void loop_consume(tmq_t* tmq) { tmq_resp_err_t err; @@ -262,7 +352,7 @@ void loop_consume(tmq_t* tmq) { printf("{consume success: %d, %d}", totalMsgs, totalRows); } -void parallel_consume(tmq_t* tmq) { +int32_t parallel_consume(tmq_t* tmq, int threadLable) { tmq_resp_err_t err; int32_t totalMsgs = 0; @@ -273,7 +363,9 @@ void parallel_consume(tmq_t* tmq) { if (tmqMsg) { totalMsgs++; -#if 0 + //printf("threadFlag: %d, totalMsgs: %d\n", threadLable, totalMsgs); + + #if 0 TAOS_ROW row; while (NULL != (row = tmq_get_row(tmqMsg))) { totalRows++; @@ -300,13 +392,65 @@ void parallel_consume(tmq_t* tmq) { exit(-1); } - printf("%d", totalMsgs); // output to sim for check result + //printf("%d", totalMsgs); // output to sim for check result + return totalMsgs; } + +void *threadFunc(void *param) { + int32_t totalMsgs = 0; + + SThreadInfo *pInfo = (SThreadInfo *)param; + + tmq_t* tmq = build_consumer_x(); + tmq_list_t* topic_list = build_topic_list_x(); + if ((NULL == tmq) || (NULL == topic_list)){ + return NULL; + } + + tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); + if (err) { + printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + + //if (0 == g_stConfInfo.consumeMsgCnt) { + // loop_consume(tmq); + //} else { + pInfo->consumeMsgCnt = parallel_consume(tmq, 1); + //} + + err = tmq_unsubscribe(tmq); + if (err) { + printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); + pInfo->consumeMsgCnt = -1; + return NULL; + } + + return NULL; +} + + int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); parseInputString(); + int32_t numOfThreads = 1; + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + SThreadInfo *pInfo = (SThreadInfo *)taosMemoryCalloc(numOfThreads, sizeof(SThreadInfo)); + + if (g_stConfInfo.numOfTopic1) { + // pthread_create one thread to consume + for (int32_t i = 0; i < numOfThreads; ++i) { + pInfo[i].expectMsgCnt = 0; + pInfo[i].consumeMsgCnt = 0; + taosThreadCreate(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); + } + } + + int32_t totalMsgs = 0; tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); if ((NULL == tmq) || (NULL == topic_list)) { @@ -319,10 +463,10 @@ int main(int32_t argc, char* argv[]) { exit(-1); } - if (0 == g_stConfInfo.consumeMsgCnt) { + if (0 == g_stConfInfo.numOfTopic1) { loop_consume(tmq); } else { - parallel_consume(tmq); + totalMsgs = parallel_consume(tmq, 0); } err = tmq_unsubscribe(tmq); @@ -331,6 +475,27 @@ int main(int32_t argc, char* argv[]) { exit(-1); } + if (g_stConfInfo.numOfTopic1) { + for (int32_t i = 0; i < numOfThreads; i++) { + taosThreadJoin(pInfo[i].thread, NULL); + } + + //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + if (0 == g_stConfInfo.checkMode) { + if ((totalMsgs + pInfo->consumeMsgCnt) == g_stConfInfo.consumeMsgCnt) { + printf("success"); + } else { + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } else if (1 == g_stConfInfo.checkMode) { + if ((totalMsgs == g_stConfInfo.consumeMsgCnt) && (pInfo->consumeMsgCnt == g_stConfInfo.consumeMsgCnt)) { + printf("success"); + } else { + printf("fail, consumer msg cnt: %d, %d", totalMsgs, pInfo->consumeMsgCnt); + } + } + } + return 0; } From 6c1774524a1b46e072527e7bfd5ac4f073c95363 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 13 Apr 2022 18:53:53 +0800 Subject: [PATCH 2/3] test:add test cases --- tests/script/jenkins/basic.txt | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index c14d4fe27b..730937333b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -59,6 +59,15 @@ ./test.sh -f tsim/tmq/oneTopic.sim ./test.sh -f tsim/tmq/multiTopic.sim +./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim +./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim + +#fail ./test.sh -f tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim +#fail ./test.sh -f tsim/tmq/main2Con1Cgrp1TopicFrStb.sim +./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim +./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrStb.sim + + # --- stable ./test.sh -f tsim/stable/disk.sim ./test.sh -f tsim/stable/dnode3.sim From 13735d725715922aba2a8881ace4c52a992f635c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 13 Apr 2022 19:04:25 +0800 Subject: [PATCH 3/3] feat(query): distributed splitting of child/normal table JOIN --- include/libs/nodes/nodes.h | 1 + source/libs/executor/src/scanoperator.c | 2 +- source/libs/nodes/src/nodesTraverseFuncs.c | 50 ++-- source/libs/parser/src/parAstCreater.c | 22 +- .../libs/parser/test/mockCatalogService.cpp | 11 +- source/libs/planner/src/planLogicCreater.c | 1 - source/libs/planner/src/planOptimizer.c | 244 ++++++++++++++++-- source/libs/planner/src/planSpliter.c | 147 ++++++++--- source/libs/planner/test/plannerTest.cpp | 16 +- 9 files changed, 396 insertions(+), 98 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3a1d7954a7..4ffacf3787 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -231,6 +231,7 @@ typedef enum EDealRes { DEAL_RES_CONTINUE = 1, DEAL_RES_IGNORE_CHILD, DEAL_RES_ERROR, + DEAL_RES_END } EDealRes; typedef EDealRes (*FNodeWalker)(SNode* pNode, void* pContext); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4179999c4b..5079a49a62 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -548,7 +548,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) { char* dbName = nodesGetValueFromNode(node); strncpy(pContext, varDataVal(dbName), varDataLen(dbName)); *((char*)pContext + varDataLen(dbName)) = 0; - return DEAL_RES_ERROR; // stop walk + return DEAL_RES_END; // stop walk } default: break; diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 4a782cce08..99a08923bb 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -46,7 +46,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_OPERATOR: { SOperatorNode* pOpNode = (SOperatorNode*)pNode; res = walkNode(pOpNode->pLeft, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pOpNode->pRight, order, walker, pContext); } break; @@ -63,10 +63,10 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; res = walkNode(pJoinTableNode->pLeft, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pJoinTableNode->pRight, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pJoinTableNode->pOnCond, order, walker, pContext); } break; @@ -80,7 +80,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_STATE_WINDOW: { SStateWindowNode* pState = (SStateWindowNode*)pNode; res = walkNode(pState->pExpr, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pState->pCol, order, walker, pContext); } break; @@ -88,7 +88,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_SESSION_WINDOW: { SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; res = walkNode(pSession->pCol, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pSession->pGap, order, walker, pContext); } break; @@ -96,16 +96,16 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker case QUERY_NODE_INTERVAL_WINDOW: { SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; res = walkNode(pInterval->pInterval, order, walker, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pOffset, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pSliding, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pFill, order, walker, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkNode(pInterval->pCol, order, walker, pContext); } break; @@ -126,7 +126,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker break; } - if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) { res = walker(pNode, pContext); } @@ -136,8 +136,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext) { SNode* node; FOREACH(node, pNodeList) { - if (DEAL_RES_ERROR == walkNode(node, order, walker, pContext)) { - return DEAL_RES_ERROR; + EDealRes res = walkNode(node, order, walker, pContext); + if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { + return res; } } return DEAL_RES_CONTINUE; @@ -185,7 +186,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_OPERATOR: { SOperatorNode* pOpNode = (SOperatorNode*)pNode; res = rewriteNode(&(pOpNode->pLeft), order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pOpNode->pRight), order, rewriter, pContext); } break; @@ -202,10 +203,10 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; res = rewriteNode(&(pJoinTableNode->pLeft), order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pJoinTableNode->pRight), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pJoinTableNode->pOnCond), order, rewriter, pContext); } break; @@ -219,7 +220,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_STATE_WINDOW: { SStateWindowNode* pState = (SStateWindowNode*)pNode; res = rewriteNode(&pState->pExpr, order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&pState->pCol, order, rewriter, pContext); } break; @@ -227,7 +228,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_SESSION_WINDOW: { SSessionWindowNode* pSession = (SSessionWindowNode*)pNode; res = rewriteNode(&pSession->pCol, order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&pSession->pGap, order, rewriter, pContext); } break; @@ -235,16 +236,16 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit case QUERY_NODE_INTERVAL_WINDOW: { SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pNode; res = rewriteNode(&(pInterval->pInterval), order, rewriter, pContext); - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pOffset), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pSliding), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext); } - if (DEAL_RES_ERROR != res) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext); } break; @@ -265,7 +266,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit break; } - if (DEAL_RES_ERROR != res && TRAVERSAL_POSTORDER == order) { + if (DEAL_RES_ERROR != res && DEAL_RES_END != res && TRAVERSAL_POSTORDER == order) { res = rewriter(pRawNode, pContext); } @@ -275,8 +276,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit static EDealRes rewriteList(SNodeList* pNodeList, ETraversalOrder order, FNodeRewriter rewriter, void* pContext) { SNode** pNode; FOREACH_FOR_REWRITE(pNode, pNodeList) { - if (DEAL_RES_ERROR == rewriteNode(pNode, order, rewriter, pContext)) { - return DEAL_RES_ERROR; + EDealRes res = rewriteNode(pNode, order, rewriter, pContext); + if (DEAL_RES_ERROR == res || DEAL_RES_END == res) { + return res; } } return DEAL_RES_CONTINUE; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 318f7ca5f7..9a26e38c98 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -328,8 +328,26 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ CHECK_OUT_OF_MEM(cond); cond->condType = type; cond->pParameterList = nodesMakeList(); - nodesListAppend(cond->pParameterList, pParam1); - nodesListAppend(cond->pParameterList, pParam2); + if ((QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type != ((SLogicConditionNode*)pParam1)->condType) || + (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type != ((SLogicConditionNode*)pParam2)->condType)) { + nodesListAppend(cond->pParameterList, pParam1); + nodesListAppend(cond->pParameterList, pParam2); + } else { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1)) { + nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList); + ((SLogicConditionNode*)pParam1)->pParameterList = NULL; + nodesDestroyNode(pParam1); + } else { + nodesListAppend(cond->pParameterList, pParam1); + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2)) { + nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList); + ((SLogicConditionNode*)pParam2)->pParameterList = NULL; + nodesDestroyNode(pParam2); + } else { + nodesListAppend(cond->pParameterList, pParam2); + } + } return (SNode*)cond; } diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 402caeb252..3da3678563 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -146,6 +146,7 @@ public: meta_[db][tbname].reset(new MockTableMeta()); meta_[db][tbname]->schema = table.release(); meta_[db][tbname]->schema->uid = id_++; + meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE; SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,}; addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030); @@ -197,11 +198,11 @@ public: std::cout << "Table:" << table.first << std::endl; std::cout << SH("Field") << SH("Type") << SH("DataType") << IH("Bytes") << std::endl; std::cout << SL(3, 1) << std::endl; - int16_t numOfTags = schema->tableInfo.numOfTags; - int16_t numOfFields = numOfTags + schema->tableInfo.numOfColumns; + int16_t numOfColumns = schema->tableInfo.numOfColumns; + int16_t numOfFields = numOfColumns + schema->tableInfo.numOfTags; for (int16_t i = 0; i < numOfFields; ++i) { const SSchema* col = schema->schema + i; - std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfTags)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl; + std::cout << SF(std::string(col->name)) << SH(ftToString(i, numOfColumns)) << SH(dtToString(col->type)) << IF(col->bytes) << std::endl; } std::cout << std::endl; } @@ -262,8 +263,8 @@ private: return tDataTypes[type].name; } - std::string ftToString(int16_t colid, int16_t numOfTags) const { - return (0 == colid ? "column" : (colid <= numOfTags ? "tag" : "column")); + std::string ftToString(int16_t colid, int16_t numOfColumns) const { + return (0 == colid ? "column" : (colid <= numOfColumns ? "tag" : "column")); } STableMeta* getTableSchemaMeta(const std::string& db, const std::string& tbname) const { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 1d8400e1eb..3c60b62434 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -694,7 +694,6 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS } return code; - return TSDB_CODE_SUCCESS; } static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index c56e113d40..19e6718fe8 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -41,6 +41,21 @@ typedef struct SOsdInfo { SNodeList* pDsoFuncs; } SOsdInfo; +typedef struct SCpdIsMultiTableCondCxt { + SNodeList* pLeftCols; + SNodeList* pRightCols; + bool havaLeftCol; + bool haveRightCol; +} SCpdIsMultiTableCondCxt; + +typedef enum ECondAction { + COND_ACTION_STAY = 1, + COND_ACTION_PUSH_JOIN, + COND_ACTION_PUSH_LEFT_CHILD, + COND_ACTION_PUSH_RIGHT_CHILD + // after supporting outer join, there are other possibilities +} ECondAction; + static bool osdMayBeOptimized(SLogicNode* pNode) { if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OSD)) { return false; @@ -152,34 +167,227 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* return TSDB_CODE_SUCCESS; } -static int32_t cpdPartitionCondition(SJoinLogicNode* pJoin, SNodeList** pMultiTableCond, SNodeList** pSingleTableCond) { - // todo +static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) { + SNode* pTableCol = NULL; + FOREACH(pTableCol, pTableCols) { + if (nodesEqualNode(pCondCol, pTableCol)) { + return true; + } + } + return false; +} + +static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) { + SCpdIsMultiTableCondCxt* pCxt = pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (belongThisTable(pNode, pCxt->pLeftCols)) { + pCxt->havaLeftCol = true; + } else if (belongThisTable(pNode, pCxt->pRightCols)) { + pCxt->haveRightCol = true; + } + return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE; + } + return DEAL_RES_CONTINUE; +} + +static ECondAction cpdCondAction(EJoinType joinType, SNodeList* pLeftCols, SNodeList* pRightCols, SNode* pNode) { + SCpdIsMultiTableCondCxt cxt = { .pLeftCols = pLeftCols, .pRightCols = pRightCols, .havaLeftCol = false, .haveRightCol = false }; + nodesWalkExpr(pNode, cpdIsMultiTableCondImpl, &cxt); + return (JOIN_TYPE_INNER != joinType ? COND_ACTION_STAY : + (cxt.havaLeftCol && cxt.haveRightCol ? COND_ACTION_PUSH_JOIN : (cxt.havaLeftCol ? COND_ACTION_PUSH_LEFT_CHILD : COND_ACTION_PUSH_RIGHT_CHILD))); +} + +static int32_t cpdMakeCond(SNodeList** pConds, SNode** pCond) { + if (NULL == *pConds) { + return TSDB_CODE_SUCCESS; + } + + if (1 == LIST_LENGTH(*pConds)) { + *pCond = nodesListGetNode(*pConds, 0); + nodesClearList(*pConds); + } else { + SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->condType = LOGIC_COND_TYPE_AND; + pLogicCond->pParameterList = *pConds; + *pCond = (SNode*)pLogicCond; + } + *pConds = NULL; + return TSDB_CODE_SUCCESS; } -static int32_t cpdPushJoinCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pMultiTableCond) { - // todo +static int32_t cpdPartitionLogicCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pJoin->node.pConditions; + if (LOGIC_COND_TYPE_AND != pLogicCond->condType) { + return TSDB_CODE_SUCCESS; + } + + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + int32_t code = TSDB_CODE_SUCCESS; + + SNodeList* pOnConds = NULL; + SNodeList* pLeftChildConds = NULL; + SNodeList* pRightChildConds = NULL; + SNodeList* pRemainConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pCond); + if (COND_ACTION_PUSH_JOIN == condAction) { + code = nodesListMakeAppend(&pOnConds, nodesCloneNode(pCond)); + } else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) { + code = nodesListMakeAppend(&pLeftChildConds, nodesCloneNode(pCond)); + } else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) { + code = nodesListMakeAppend(&pRightChildConds, nodesCloneNode(pCond)); + } else { + code = nodesListMakeAppend(&pRemainConds, nodesCloneNode(pCond)); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + SNode* pTempOnCond = NULL; + SNode* pTempLeftChildCond = NULL; + SNode* pTempRightChildCond = NULL; + SNode* pTempRemainCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pOnConds, &pTempOnCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pLeftChildConds, &pTempLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pRightChildConds, &pTempRightChildCond); + } + if (TSDB_CODE_SUCCESS == code) { + code = cpdMakeCond(&pRemainConds, &pTempRemainCond); + } + + if (TSDB_CODE_SUCCESS == code) { + *pOnCond = pTempOnCond; + *pLeftChildCond = pTempLeftChildCond; + *pRightChildCond = pTempRightChildCond; + nodesDestroyNode(pJoin->node.pConditions); + pJoin->node.pConditions = pTempRemainCond; + } else { + nodesDestroyList(pOnConds); + nodesDestroyList(pLeftChildConds); + nodesDestroyList(pRightChildConds); + nodesDestroyList(pRemainConds); + nodesDestroyNode(pTempOnCond); + nodesDestroyNode(pTempLeftChildCond); + nodesDestroyNode(pTempRightChildCond); + nodesDestroyNode(pTempRemainCond); + } + + return code; +} + +static int32_t cpdPartitionOpCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + ECondAction condAction = cpdCondAction(pJoin->joinType, pLeftCols, pRightCols, pJoin->node.pConditions); + if (COND_ACTION_STAY == condAction) { + return TSDB_CODE_SUCCESS; + } else if (COND_ACTION_PUSH_JOIN == condAction) { + *pOnCond = pJoin->node.pConditions; + } else if (COND_ACTION_PUSH_LEFT_CHILD == condAction) { + *pLeftChildCond = pJoin->node.pConditions; + } else if (COND_ACTION_PUSH_RIGHT_CHILD == condAction) { + *pRightChildCond = pJoin->node.pConditions; + } + pJoin->node.pConditions = NULL; return TSDB_CODE_SUCCESS; } -static int32_t cpdPushJoinCondToChildren(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNodeList* pSingleTableCond) { - // todo - return TSDB_CODE_SUCCESS; +static int32_t cpdPartitionCond(SJoinLogicNode* pJoin, SNode** pOnCond, SNode** pLeftChildCond, SNode** pRightChildCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->node.pConditions)) { + return cpdPartitionLogicCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); + } else { + return cpdPartitionOpCond(pJoin, pOnCond, pLeftChildCond, pRightChildCond); + } +} + +static int32_t cpdCondAppend(SOptimizeContext* pCxt, SNode** pCond, SNode** pAdditionalCond) { + if (NULL == *pCond) { + TSWAP(*pCond, *pAdditionalCond, SNode*); + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCond)) { + code = nodesListAppend(((SLogicConditionNode*)*pCond)->pParameterList, *pAdditionalCond); + if (TSDB_CODE_SUCCESS == code) { + *pAdditionalCond = NULL; + } + } else { + SLogicConditionNode* pLogicCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (NULL == pLogicCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pLogicCond->condType = LOGIC_COND_TYPE_AND; + code = nodesListMakeAppend(&pLogicCond->pParameterList, *pAdditionalCond); + if (TSDB_CODE_SUCCESS == code) { + *pAdditionalCond = NULL; + code = nodesListMakeAppend(&pLogicCond->pParameterList, *pCond); + } + if (TSDB_CODE_SUCCESS == code) { + *pCond = (SNode*)pLogicCond; + } else { + nodesDestroyNode(pLogicCond); + } + } + return code; +} + +static int32_t cpdPushCondToOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode** pCond) { + return cpdCondAppend(pCxt, &pJoin->pOnConditions, pCond); +} + +static int32_t cpdPushCondToScan(SOptimizeContext* pCxt, SScanLogicNode* pScan, SNode** pCond) { + return cpdCondAppend(pCxt, &pScan->node.pConditions, pCond); +} + +static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { + switch (nodeType(pChild)) { + case QUERY_NODE_LOGIC_PLAN_SCAN: + return cpdPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond); + default: + break; + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; } static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL != pJoin->node.pConditions) { - SNodeList* pMultiTableCond = NULL; - SNodeList* pSingleTableCond = NULL; - int32_t code = cpdPartitionCondition(pJoin, &pMultiTableCond, &pSingleTableCond); - if (TSDB_CODE_SUCCESS == code && NULL != pMultiTableCond) { - code = cpdPushJoinCondToOnCond(pCxt, pJoin, pMultiTableCond); - } - if (TSDB_CODE_SUCCESS == code && NULL != pSingleTableCond) { - code = cpdPushJoinCondToChildren(pCxt, pJoin, pSingleTableCond); - } + if (NULL == pJoin->node.pConditions) { + return TSDB_CODE_SUCCESS; } - return TSDB_CODE_SUCCESS; + + SNode* pOnCond = NULL; + SNode* pLeftChildCond = NULL; + SNode* pRightChildCond = NULL; + int32_t code = cpdPartitionCond(pJoin, &pOnCond, &pLeftChildCond, &pRightChildCond); + if (TSDB_CODE_SUCCESS == code && NULL != pOnCond) { + code = cpdPushCondToOnCond(pCxt, pJoin, &pOnCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) { + code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond); + } + if (TSDB_CODE_SUCCESS == code && NULL != pRightChildCond) { + code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pOnCond); + nodesDestroyNode(pLeftChildCond); + nodesDestroyNode(pRightChildCond); + } + + return code; } static int32_t cpdPushAggCondition(SOptimizeContext* pCxt, SAggLogicNode* pAgg) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 0b21052955..e54cf33934 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -18,6 +18,7 @@ #define SPLIT_FLAG_MASK(n) (1 << n) #define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0) +#define SPLIT_FLAG_CTJ SPLIT_FLAG_MASK(1) #define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask) #define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) @@ -39,43 +40,14 @@ typedef struct SStsInfo { SLogicSubplan* pSubplan; } SStsInfo; -static SLogicNode* stsMatchByNode(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && - NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild); - if (NULL != pSplitNode) { - return pSplitNode; - } - } - return NULL; -} +typedef struct SCtjInfo { + SScanLogicNode* pScan; + SLogicSubplan* pSubplan; +} SCtjInfo; -static void stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) { - SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode); - if (NULL != pSplitNode) { - pInfo->pScan = (SScanLogicNode*)pSplitNode; - pInfo->pSubplan = pSubplan; - } -} -static void stsMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStsInfo* pInfo) { - if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) { - stsFindSplitNode(pSubplan, pInfo); - } - SNode* pChild; - FOREACH(pChild, pSubplan->pChildren) { - stsMatch(pCxt, (SLogicSubplan*)pChild, pInfo); - if (NULL != pInfo->pScan) { - break; - } - } - return; -} +typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, SStsInfo* pInfo); -static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) { +static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) { SLogicSubplan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); if (NULL == pSubplan) { return NULL; @@ -84,11 +56,11 @@ static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan); TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*); - SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS); + SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, flag); return pSubplan; } -static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) { +static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, ESubplanType subplanType) { SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); if (NULL == pExchange) { return TSDB_CODE_OUT_OF_MEMORY; @@ -119,10 +91,48 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla return TSDB_CODE_FAILED; } +static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) { + if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) { + if (func(pSubplan, pInfo)) { + return true; + } + } + SNode* pChild; + FOREACH(pChild, pSubplan->pChildren) { + if (splMatch(pCxt, (SLogicSubplan*)pChild, flag, func, pInfo)) { + return true; + } + } + return false; +} + +static SLogicNode* stsMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && + NULL != ((SScanLogicNode*)pNode)->pVgroupList && ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups > 1) { + return pNode; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static bool stsFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) { + SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + pInfo->pScan = (SScanLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + } + return NULL != pSplitNode; +} + static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { SStsInfo info = {0}; - stsMatch(pCxt, pSubplan, &info); - if (NULL == info.pScan) { + if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_STS, stsFindSplitNode, &info)) { return TSDB_CODE_SUCCESS; } if (NULL == info.pSubplan->pChildren) { @@ -131,9 +141,61 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { return TSDB_CODE_OUT_OF_MEMORY; } } - int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, stsCreateScanSubplan(pCxt, info.pScan)); + int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_STS)); if (TSDB_CODE_SUCCESS == code) { - code = stsCreateExchangeNode(pCxt, info.pSubplan, info.pScan); + code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, SUBPLAN_TYPE_MERGE); + } + ++(pCxt->groupId); + pCxt->split = true; + return code; +} + +static bool ctjIsSingleTable(int8_t tableType) { + return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType); +} + +static SLogicNode* ctjMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) { + SLogicNode* pLeft = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); + SLogicNode* pRight = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1); + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pLeft) && ctjIsSingleTable(((SScanLogicNode*)pLeft)->pMeta->tableType) && + QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pRight) && ctjIsSingleTable(((SScanLogicNode*)pRight)->pMeta->tableType)) { + return pRight; + } + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = ctjMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static bool ctjFindSplitNode(SLogicSubplan* pSubplan, SStsInfo* pInfo) { + SLogicNode* pSplitNode = ctjMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + pInfo->pScan = (SScanLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + } + return NULL != pSplitNode; +} + +static int32_t ctjSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + SCtjInfo info = {0}; + if (!splMatch(pCxt, pSubplan, SPLIT_FLAG_CTJ, ctjFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + if (NULL == info.pSubplan->pChildren) { + info.pSubplan->pChildren = nodesMakeList(); + if (NULL == info.pSubplan->pChildren) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + int32_t code = nodesListStrictAppend(info.pSubplan->pChildren, splCreateScanSubplan(pCxt, info.pScan, SPLIT_FLAG_CTJ)); + if (TSDB_CODE_SUCCESS == code) { + code = splCreateExchangeNode(pCxt, info.pSubplan, info.pScan, info.pSubplan->subplanType); } ++(pCxt->groupId); pCxt->split = true; @@ -141,7 +203,8 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { } static const SSplitRule splitRuleSet[] = { - { .pName = "SuperTableScan", .splitFunc = stsSplit } + { .pName = "SuperTableScan", .splitFunc = stsSplit }, + { .pName = "ChildTableJoin", .splitFunc = ctjSplit }, }; static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index fd0084c01e..69c76fc3ae 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -70,6 +70,12 @@ protected: cout << "unformatted logic plan : " << endl; cout << toString((const SNode*)pLogicNode, false) << endl; + code = optimizeLogicPlan(&cxt, pLogicNode); + if (code != TSDB_CODE_SUCCESS) { + cout << "sql:[" << cxt_.pSql << "] optimizeLogicPlan code:" << code << ", strerror:" << tstrerror(code) << endl; + return false; + } + SLogicSubplan* pLogicSubplan = nullptr; code = splitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); if (code != TSDB_CODE_SUCCESS) { @@ -174,13 +180,13 @@ TEST_F(PlannerTest, selectStableBasic) { TEST_F(PlannerTest, selectJoin) { setDatabase("root", "test"); - bind("SELECT * FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); - ASSERT_TRUE(run()); + // bind("SELECT t1.c1, t2.c2 FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); + // ASSERT_TRUE(run()); - bind("SELECT * FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1"); - ASSERT_TRUE(run()); + // bind("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"); + // ASSERT_TRUE(run()); - bind("SELECT t1.* FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1"); + bind("SELECT t1.c1, t2.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and t2.c2 = 'qwe'"); ASSERT_TRUE(run()); }