328 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
			
		
		
	
	
			328 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
| #### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
 | |
| #basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
 | |
| #basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
 | |
| #basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
 | |
| #basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
 | |
| 
 | |
| # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
 | |
| # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
 | |
| #
 | |
| # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
 | |
| #
 | |
| 
 | |
| run tsim/tmq/prepareBasicEnv-1vgrp.sim
 | |
| 
 | |
| #---- global parameters start ----#
 | |
| $dbName    = db
 | |
| $vgroups    = 1
 | |
| $stbPrefix  = stb
 | |
| $ctbPrefix  = ctb
 | |
| $ntbPrefix  = ntb
 | |
| $stbNum     = 1
 | |
| $ctbNum     = 10
 | |
| $ntbNum     = 10
 | |
| $rowsPerCtb = 10
 | |
| $tstart     = 1640966400000  # 2022-01-01 00:00:00.000
 | |
| #---- global parameters end ----#
 | |
| 
 | |
| $pullDelay    = 5
 | |
| $ifcheckdata  = 1
 | |
| $ifmanualcommit = 1
 | |
| $showMsg      = 1
 | |
| $showRow      = 0
 | |
| 
 | |
| sql connect
 | |
| sql use $dbName
 | |
| 
 | |
| print == alter database
 | |
| sql alter database $dbName wal_retention_period 3600
 | |
| 
 | |
| print == create topics from super table
 | |
| sql create topic topic_stb_column as select ts, c3 from stb
 | |
| sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
 | |
| sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
 | |
| 
 | |
| print == create topics from child table
 | |
| sql create topic topic_ctb_column as select ts, c3 from ctb0
 | |
| sql create topic topic_ctb_all as select * from ctb0
 | |
| sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
 | |
| 
 | |
| print == create topics from normal table
 | |
| sql create topic topic_ntb_column as select ts, c3 from ntb0
 | |
| sql create topic topic_ntb_all as select * from ntb0
 | |
| sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
 | |
| 
 | |
| #sql show topics
 | |
| #if $rows != 9 then 
 | |
| #  return -1
 | |
| #endi
 | |
| 
 | |
| #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
 | |
| $keyList = ' . group.id:cgrp1
 | |
| $keyList = $keyList . ,
 | |
| $keyList = $keyList . enable.auto.commit:false
 | |
| #$keyList = $keyList . ,
 | |
| #$keyList = $keyList . auto.commit.interval.ms:6000
 | |
| #$keyList = $keyList . ,
 | |
| #$keyList = $keyList . auto.offset.reset:earliest
 | |
| $keyList = $keyList . '
 | |
| print ========== key list:  $keyList
 | |
| 
 | |
| $topicNum = 3
 | |
| 
 | |
| #=============================== start consume =============================#
 | |
| 
 | |
| 
 | |
| print ================ test consume from stb
 | |
| print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
 | |
| $topicList = ' . topic_stb_column
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_stb_all
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_stb_function
 | |
| $topicList = $topicList . '
 | |
| 
 | |
| $consumerId    = 0
 | |
| $totalMsgOfStb = $ctbNum * $rowsPerCtb
 | |
| $totalMsgOfStb = $totalMsgOfStb * $topicNum
 | |
| $expectmsgcnt  = $topicNum
 | |
| sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $totalMsgOfStb , $ifcheckdata , $ifmanualcommit )
 | |
| $consumerId    = 1
 | |
| sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $totalMsgOfStb , $ifcheckdata , $ifmanualcommit )
 | |
| 
 | |
| print == start consumer to pull msgs from stb
 | |
| print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
 | |
| system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
 | |
| 
 | |
| print == check consume result
 | |
| wait_consumer_end_from_stb:
 | |
| sql select * from consumeresult
 | |
| print ==> rows: $rows 
 | |
| print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
 | |
| print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
 | |
| if $rows != 2 then
 | |
|   sleep 1000
 | |
|   goto wait_consumer_end_from_stb
 | |
| endi
 | |
| if $data[0][1] == 0 then
 | |
|   if $data[1][1] != 1 then
 | |
|     return -1
 | |
|   endi
 | |
| endi
 | |
| if $data[0][1] == 1 then
 | |
|   if $data[1][1] != 0 then
 | |
|     return -1
 | |
|   endi
 | |
| endi
 | |
| 
 | |
| # either $data[0][2] == $expectmsgcnt and $data[1][2] == 0
 | |
| # or     $data[0][2] == 0              and $data[1][2] == $expectmsgcnt
 | |
| if $data[0][2] == $expectmsgcnt then
 | |
|   if $data[1][2] == 0 then
 | |
|     goto check_ok_0
 | |
|   endi
 | |
| elif $data[0][2] == 0 then
 | |
|   if $data[1][2] == $expectmsgcnt then
 | |
|     goto check_ok_0
 | |
|   endi
 | |
| endi
 | |
| return -1
 | |
| check_ok_0:
 | |
| 
 | |
| if $data[0][3] == $totalMsgOfStb then
 | |
|   if $data[1][3] == 0 then
 | |
|     goto check_ok_1
 | |
|   endi
 | |
| elif $data[0][3] == 0 then
 | |
|   if $data[1][3] == $totalMsgOfStb then
 | |
|     goto check_ok_1
 | |
|   endi
 | |
| endi
 | |
| return -1
 | |
| check_ok_1:
 | |
| 
 | |
| #######################################################################################
 | |
| # clear consume info and consume result 
 | |
| #run tsim/tmq/clearConsume.sim
 | |
| # because drop table function no stable, so by create new db for consume info and result. Modify it later
 | |
| $cdbName = cdb1
 | |
| sql create database $cdbName vgroups 1
 | |
| sleep 500
 | |
| sql use $cdbName
 | |
| 
 | |
| print == alter database
 | |
| sql alter database $cdbName wal_retention_period 3600
 | |
| 
 | |
| print == create consume info table and consume result table for ctb
 | |
| sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
 | |
| sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
 | |
| 
 | |
| sql show tables
 | |
| if $rows != 2 then 
 | |
|   return -1
 | |
| endi
 | |
| #######################################################################################
 | |
| 
 | |
| 
 | |
| print ================ test consume from ctb
 | |
| print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
 | |
| $topicList = ' . topic_ctb_column
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ctb_all
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ctb_function
 | |
| $topicList = $topicList . '
 | |
| 
 | |
| $consumerId    = 0
 | |
| $totalMsgOfCtb = $rowsPerCtb * $topicNum
 | |
| $expectmsgcnt  = $topicNum
 | |
| sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $totalMsgOfCtb , $ifcheckdata , $ifmanualcommit )
 | |
| $consumerId    = 1
 | |
| sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $totalMsgOfCtb , $ifcheckdata , $ifmanualcommit )
 | |
| 
 | |
| print == start consumer to pull msgs from ctb
 | |
| print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
 | |
| system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
 | |
| 
 | |
| print == check consume result
 | |
| wait_consumer_end_from_ctb:
 | |
| sql select * from consumeresult
 | |
| print ==> rows: $rows 
 | |
| print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
 | |
| print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
 | |
| if $rows != 2 then
 | |
|   sleep 1000
 | |
|   goto wait_consumer_end_from_ctb
 | |
| endi
 | |
| if $data[0][1] == 0 then
 | |
|   if $data[1][1] != 1 then
 | |
|     return -1
 | |
|   endi
 | |
| endi
 | |
| if $data[0][1] == 1 then
 | |
|   if $data[1][1] != 0 then
 | |
|     return -1
 | |
|   endi
 | |
| endi
 | |
| 
 | |
| # either $data[0][2] == $expectmsgcnt and $data[1][2] == 0
 | |
| # or     $data[0][2] == 0              and $data[1][2] == $expectmsgcnt
 | |
| if $data[0][2] == $expectmsgcnt then
 | |
|   if $data[1][2] == 0 then
 | |
|     goto check_ok_2
 | |
|   endi
 | |
| elif $data[0][2] == 0 then
 | |
|   if $data[1][2] == $expectmsgcnt then
 | |
|     goto check_ok_2
 | |
|   endi
 | |
| endi
 | |
| return -1
 | |
| check_ok_2:
 | |
| 
 | |
| if $data[0][3] == $totalMsgOfCtb then
 | |
|   if $data[1][3] == 0 then
 | |
|     goto check_ok_3
 | |
|   endi
 | |
| elif $data[0][3] == 0 then
 | |
|   if $data[1][3] == $totalMsgOfCtb then
 | |
|     goto check_ok_3
 | |
|   endi
 | |
| endi
 | |
| return -1
 | |
| check_ok_3:
 | |
| 
 | |
| #######################################################################################
 | |
| # clear consume info and consume result 
 | |
| #run tsim/tmq/clearConsume.sim
 | |
| # because drop table function no stable, so by create new db for consume info and result. Modify it later
 | |
| $cdbName = cdb2
 | |
| sql create database $cdbName vgroups 1
 | |
| sleep 500
 | |
| sql use $cdbName
 | |
| 
 | |
| print == alter database
 | |
| sql alter database $cdbName wal_retention_period 3600
 | |
| 
 | |
| print == create consume info table and consume result table for ntb
 | |
| sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
 | |
| sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
 | |
| 
 | |
| sql show tables
 | |
| if $rows != 2 then 
 | |
|   return -1
 | |
| endi
 | |
| #######################################################################################
 | |
| 
 | |
| 
 | |
| print ================ test consume from ntb
 | |
| print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
 | |
| $topicList = ' . topic_ntb_column
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ntb_all
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ntb_function
 | |
| $topicList = $topicList . '
 | |
| 
 | |
| $consumerId    = 0
 | |
| $totalMsgOfNtb = $rowsPerCtb * $topicNum
 | |
| $expectmsgcnt  = $topicNum
 | |
| sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $totalMsgOfNtb , $ifcheckdata , $ifmanualcommit )
 | |
| $consumerId    = 1
 | |
| sql insert into consumeinfo values (now+1s , $consumerId , $topicList , $keyList , $totalMsgOfNtb , $ifcheckdata , $ifmanualcommit )
 | |
| 
 | |
| print == start consumer to pull msgs from ntb
 | |
| print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
 | |
| system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
 | |
| 
 | |
| print == check consume result from ntb
 | |
| wait_consumer_end_from_ntb:
 | |
| sql select * from consumeresult
 | |
| print ==> rows: $rows 
 | |
| print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
 | |
| print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
 | |
| if $rows != 2 then
 | |
|   sleep 1000
 | |
|   goto wait_consumer_end_from_ntb
 | |
| endi
 | |
| if $data[0][1] == 0 then
 | |
|   if $data[1][1] != 1 then
 | |
|     return -1
 | |
|   endi
 | |
| endi
 | |
| if $data[1][1] == 0 then
 | |
|   if $data[0][1] != 1 then
 | |
|     return -1
 | |
|   endi
 | |
| endi
 | |
| 
 | |
| # either $data[0][2] == $expectmsgcnt and $data[1][2] == 0
 | |
| # or     $data[0][2] == 0              and $data[1][2] == $expectmsgcnt
 | |
| if $data[0][2] == $expectmsgcnt then
 | |
|   if $data[1][2] == 0 then
 | |
|     goto check_ok_4
 | |
|   endi
 | |
| elif $data[0][2] == 0 then
 | |
|   if $data[1][2] == $expectmsgcnt then
 | |
|     goto check_ok_4
 | |
|   endi
 | |
| endi
 | |
| return -1
 | |
| check_ok_4:
 | |
| 
 | |
| if $data[0][3] == $totalMsgOfNtb then
 | |
|   if $data[1][3] == 0 then
 | |
|     goto check_ok_5
 | |
|   endi
 | |
| elif $data[0][3] == 0 then
 | |
|   if $data[1][3] == $totalMsgOfNtb then
 | |
|     goto check_ok_5
 | |
|   endi
 | |
| endi
 | |
| return -1
 | |
| check_ok_5:
 | |
| 
 | |
| #------ not need stop consumer, because it exit after pull msg overthan expect msg
 | |
| #system tsim/tmq/consume.sh -s stop -x SIGINT
 | |
| 
 | |
| system sh/exec.sh -n dnode1 -s stop -x SIGINT
 |