227 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
			
		
		
	
	
			227 lines
		
	
	
		
			8.1 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
| #### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
 | |
| #basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
 | |
| #basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
 | |
| #basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
 | |
| #basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
 | |
| 
 | |
| # notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
 | |
| # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
 | |
| #
 | |
| # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
 | |
| #
 | |
| 
 | |
| run tsim/tmq/prepareBasicEnv-4vgrp.sim
 | |
| 
 | |
| #---- global parameters start ----#
 | |
| $dbName    = db
 | |
| $vgroups    = 4
 | |
| $stbPrefix  = stb
 | |
| $ctbPrefix  = ctb
 | |
| $ntbPrefix  = ntb
 | |
| $stbNum     = 1
 | |
| $ctbNum     = 10
 | |
| $ntbNum     = 10
 | |
| $rowsPerCtb = 10
 | |
| $tstart     = 1640966400000  # 2022-01-01 00:00:00.000
 | |
| #---- global parameters end ----#
 | |
| 
 | |
| $pullDelay    = 3
 | |
| $ifcheckdata  = 1
 | |
| $ifmanualcommit = 1
 | |
| $showMsg      = 1
 | |
| $showRow      = 0
 | |
| 
 | |
| sql connect
 | |
| sql use $dbName
 | |
| 
 | |
| print == create topics from super table
 | |
| sql create topic topic_stb_column as select ts, c3 from stb
 | |
| sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
 | |
| sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
 | |
| 
 | |
| print == create topics from child table
 | |
| sql create topic topic_ctb_column as select ts, c3 from ctb0
 | |
| sql create topic topic_ctb_all as select * from ctb0
 | |
| sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
 | |
| 
 | |
| print == create topics from normal table
 | |
| sql create topic topic_ntb_column as select ts, c3 from ntb0
 | |
| sql create topic topic_ntb_all as select * from ntb0
 | |
| sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
 | |
| 
 | |
| #sql show topics
 | |
| #if $rows != 9 then 
 | |
| #  return -1
 | |
| #endi
 | |
| 
 | |
| #'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
 | |
| $keyList = ' . group.id:cgrp1
 | |
| $keyList = $keyList . ,
 | |
| $keyList = $keyList . enable.auto.commit:false
 | |
| #$keyList = $keyList . ,
 | |
| #$keyList = $keyList . auto.commit.interval.ms:6000
 | |
| #$keyList = $keyList . ,
 | |
| #$keyList = $keyList . auto.offset.reset:earliest
 | |
| $keyList = $keyList . '
 | |
| print ========== key list:  $keyList
 | |
| 
 | |
| 
 | |
| $topicNum = 3
 | |
| 
 | |
| print ================ test consume from stb
 | |
| print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
 | |
| $topicList = ' . topic_stb_column
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_stb_all
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_stb_function
 | |
| $topicList = $topicList . '
 | |
| 
 | |
| $consumerId    = 0
 | |
| $totalMsgOfStb = $ctbNum * $rowsPerCtb
 | |
| $totalMsgOfStb = $totalMsgOfStb * $topicNum
 | |
| $expectmsgcnt  = $totalMsgOfStb
 | |
| sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
 | |
| 
 | |
| print == start consumer to pull msgs from stb
 | |
| print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
 | |
| system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
 | |
| 
 | |
| print == check consume result
 | |
| wait_consumer_end_from_stb:
 | |
| sql select * from consumeresult
 | |
| print ==> rows: $rows 
 | |
| print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
 | |
| if $rows != 1 then
 | |
|   sleep 1000
 | |
|   goto wait_consumer_end_from_stb
 | |
| endi
 | |
| if $data[0][1] != $consumerId then
 | |
|   return -1
 | |
| endi
 | |
| if $data[0][2] != $expectmsgcnt then
 | |
|   return -1
 | |
| endi
 | |
| if $data[0][3] != $expectmsgcnt then
 | |
|   return -1
 | |
| endi
 | |
| 
 | |
| #######################################################################################
 | |
| # clear consume info and consume result 
 | |
| #run tsim/tmq/clearConsume.sim
 | |
| # because drop table function no stable, so by create new db for consume info and result. Modify it later
 | |
| $cdbName = cdb1
 | |
| sql create database $cdbName vgroups 1
 | |
| sleep 500
 | |
| sql use $cdbName
 | |
| 
 | |
| print == create consume info table and consume result table
 | |
| sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
 | |
| sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
 | |
| 
 | |
| sql show tables
 | |
| if $rows != 2 then 
 | |
|   return -1
 | |
| endi
 | |
| #######################################################################################
 | |
| 
 | |
| 
 | |
| print ================ test consume from ctb
 | |
| print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
 | |
| $topicList = ' . topic_ctb_column
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ctb_all
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ctb_function
 | |
| $topicList = $topicList . '
 | |
| 
 | |
| $consumerId    = 0
 | |
| $totalMsgOfCtb = $rowsPerCtb * $topicNum
 | |
| $expectmsgcnt  = $totalMsgOfCtb
 | |
| sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
 | |
| 
 | |
| print == start consumer to pull msgs from ctb
 | |
| print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
 | |
| system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
 | |
| 
 | |
| print == check consume result
 | |
| wait_consumer_end_from_ctb:
 | |
| sql select * from consumeresult
 | |
| print ==> rows: $rows 
 | |
| print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
 | |
| if $rows != 1 then
 | |
|   sleep 1000
 | |
|   goto wait_consumer_end_from_ctb
 | |
| endi
 | |
| if $data[0][1] != $consumerId then
 | |
|   return -1
 | |
| endi
 | |
| if $data[0][2] != $totalMsgOfCtb then
 | |
|   return -1
 | |
| endi
 | |
| if $data[0][3] != $totalMsgOfCtb then
 | |
|   return -1
 | |
| endi
 | |
| 
 | |
| #######################################################################################
 | |
| # clear consume info and consume result 
 | |
| #run tsim/tmq/clearConsume.sim
 | |
| # because drop table function no stable, so by create new db for consume info and result. Modify it later
 | |
| $cdbName = cdb2
 | |
| sql create database $cdbName vgroups 1
 | |
| sleep 500
 | |
| sql use $cdbName
 | |
| 
 | |
| print == create consume info table and consume result table
 | |
| sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
 | |
| sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
 | |
| 
 | |
| sql show tables
 | |
| if $rows != 2 then 
 | |
|   return -1
 | |
| endi
 | |
| #######################################################################################
 | |
| 
 | |
| 
 | |
| print ================ test consume from ntb
 | |
| print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
 | |
| $topicList = ' . topic_ntb_column
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ntb_all
 | |
| $topicList = $topicList . ,
 | |
| $topicList = $topicList . topic_ntb_function
 | |
| $topicList = $topicList . '
 | |
| 
 | |
| $consumerId    = 0
 | |
| $totalMsgOfNtb = $rowsPerCtb * $topicNum
 | |
| $expectmsgcnt  = $totalMsgOfNtb
 | |
| sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
 | |
| 
 | |
| print == start consumer to pull msgs from ntb
 | |
| print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
 | |
| system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
 | |
| 
 | |
| print == check consume result from ntb
 | |
| wait_consumer_end_from_ntb:
 | |
| sql select * from consumeresult
 | |
| print ==> rows: $rows 
 | |
| print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
 | |
| if $rows != 1 then
 | |
|   sleep 1000
 | |
|   goto wait_consumer_end_from_ntb
 | |
| endi
 | |
| if $data[0][1] != $consumerId then
 | |
|   return -1
 | |
| endi
 | |
| if $data[0][2] != $totalMsgOfNtb then
 | |
|   return -1
 | |
| endi
 | |
| if $data[0][3] != $totalMsgOfNtb then
 | |
|   return -1
 | |
| endi
 | |
| 
 | |
| #------ not need stop consumer, because it exit after pull msg overthan expect msg
 | |
| #system tsim/tmq/consume.sh -s stop -x SIGINT
 | |
| 
 | |
| system sh/exec.sh -n dnode1 -s stop -x SIGINT
 |