test: add topic and consume batch test
This commit is contained in:
parent
9c7b53083b
commit
a143618e74
|
@ -0,0 +1,104 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# The option for wal_retetion_period and wal_retention_size is work well
|
||||
#
|
||||
|
||||
import taos
|
||||
from taos.tmq import Consumer
|
||||
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import json
|
||||
import time
|
||||
import random
|
||||
from datetime import date
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from os import path
|
||||
|
||||
|
||||
topicName = "topic"
|
||||
topicNum = 100
|
||||
|
||||
# consume topic
|
||||
def consume_topic(topic_name, group,consume_cnt, index, wait):
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": group,
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
}
|
||||
)
|
||||
|
||||
print(f"start consumer topic:{topic_name} group={group} index={index} ...")
|
||||
consumer.subscribe([topic_name])
|
||||
cnt = 0
|
||||
try:
|
||||
while True and cnt < consume_cnt:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
if wait:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
cnt += 1
|
||||
print(f" consume {cnt} ")
|
||||
for block in val:
|
||||
datas = block.fetchall()
|
||||
data = datas[0][:50]
|
||||
|
||||
print(f" {topic_name}_{group}_{index} {cnt} {data}")
|
||||
|
||||
finally:
|
||||
consumer.unsubscribe()
|
||||
consumer.close()
|
||||
|
||||
def consumerThread(index):
|
||||
global topicName, topicNum
|
||||
print(f' thread {index} start...')
|
||||
while True:
|
||||
idx = random.randint(0, topicNum - 1)
|
||||
name = f"{topicName}{idx}"
|
||||
group = f"group_{index}_{idx}"
|
||||
consume_topic(name, group, 100, index, True)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(sys.argv)
|
||||
threadCnt = 10
|
||||
|
||||
if len(sys.argv) == 1:
|
||||
threadCnt = int(sys.argv[1])
|
||||
|
||||
|
||||
threads = []
|
||||
print(f'consumer with {threadCnt} threads...')
|
||||
for i in range(threadCnt):
|
||||
x = threading.Thread(target=consumerThread, args=(i,))
|
||||
x.start()
|
||||
threads.append(x)
|
||||
|
||||
# wait
|
||||
for i, thread in enumerate(threads):
|
||||
thread.join()
|
||||
print(f'join thread {i} end.')
|
||||
|
|
@ -0,0 +1,114 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import sys
|
||||
import random
|
||||
import time
|
||||
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import *
|
||||
from util.sqlset import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
self.setsql = TDSetSql()
|
||||
|
||||
# prepareEnv
|
||||
def prepareEnv(self):
|
||||
self.dbName = "mullevel"
|
||||
self.stbName = "meters"
|
||||
self.topicName = "topic"
|
||||
self.topicNum = 100
|
||||
self.loop = 50000
|
||||
|
||||
sql = f"use {self.dbName}"
|
||||
tdSql.execute(sql)
|
||||
|
||||
# generate topic sql
|
||||
self.sqls = [
|
||||
f"select * from {self.stbName}",
|
||||
f"select * from {self.stbName} where ui < 200",
|
||||
f"select * from {self.stbName} where fc > 20.1",
|
||||
f"select * from {self.stbName} where nch like '%%a%%'",
|
||||
f"select * from {self.stbName} where fc > 20.1",
|
||||
f"select lower(bin) from {self.stbName} where length(bin) < 10;",
|
||||
f"select upper(bin) from {self.stbName} where length(nch) > 10;",
|
||||
f"select upper(bin) from {self.stbName} where ti > 10 or ic < 40;",
|
||||
f"select * from {self.stbName} where ic < 100 "
|
||||
]
|
||||
|
||||
|
||||
|
||||
# prepareEnv
|
||||
def createTopics(self):
|
||||
for i in range(self.topicNum):
|
||||
topicName = f"{self.topicName}{i}"
|
||||
sql = random.choice(self.sqls)
|
||||
createSql = f"create topic if not exists {topicName} as {sql}"
|
||||
try:
|
||||
tdSql.execute(createSql, 3, True)
|
||||
except:
|
||||
tdLog.info(f" create topic {topicName} failed.")
|
||||
|
||||
|
||||
# random del topic
|
||||
def managerTopics(self):
|
||||
|
||||
for i in range(self.loop):
|
||||
tdLog.info(f"start modify loop={i}")
|
||||
idx = random.randint(0, self.topicNum - 1)
|
||||
# delete
|
||||
topicName = f"{self.topicName}{idx}"
|
||||
sql = f"drop topic if exist {topicName}"
|
||||
try:
|
||||
tdSql.execute(sql, 3, True)
|
||||
except:
|
||||
tdLog.info(f" drop topic {topicName} failed.")
|
||||
|
||||
|
||||
# create topic
|
||||
sql = random.choice(self.sqls)
|
||||
createSql = f"create topic if not exists {topicName} as {sql}"
|
||||
try:
|
||||
tdSql.execute(createSql, 3, True)
|
||||
except:
|
||||
tdLog.info(f" create topic {topicName} failed.")
|
||||
|
||||
seconds = [0.1, 0.5, 3, 2.5, 1.5, 0.4, 5.2, 2.6, 0.4, 0.2]
|
||||
time.sleep(random.choice(seconds))
|
||||
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
# prepare env
|
||||
self.prepareEnv()
|
||||
|
||||
# create topic
|
||||
self.createTopics()
|
||||
|
||||
# modify topic
|
||||
self.managerTopics()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue