test: add forbid stream and topic case to splitVGroup.py
This commit is contained in:
parent
f523eba5ce
commit
fa4873df3c
|
@ -328,9 +328,28 @@ class TDTestCase:
|
|||
tdLog.exit("split vgroup transaction is not finished after executing 50s")
|
||||
return False
|
||||
|
||||
# split error
|
||||
def expectSplitError(self, dbName):
|
||||
vgids = self.getVGroup(dbName)
|
||||
selid = random.choice(vgids)
|
||||
sql = f"split vgroup {selid}"
|
||||
tdLog.info(sql)
|
||||
tdSql.error(sql)
|
||||
|
||||
# expect split ok
|
||||
def expectSplitOk(self, dbName):
|
||||
# split vgroup
|
||||
vgList1 = self.getVGroup(dbName)
|
||||
self.splitVGroup(dbName)
|
||||
vgList2 = self.getVGroup(dbName)
|
||||
vgNum1 = len(vgList1) + 1
|
||||
vgNum2 = len(vgList2)
|
||||
if vgNum1 != vgNum2:
|
||||
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
|
||||
return
|
||||
|
||||
# split empty database
|
||||
def splitEmptyDB(self):
|
||||
|
||||
def splitEmptyDB(self):
|
||||
dbName = "emptydb"
|
||||
vgNum = 2
|
||||
# create database
|
||||
|
@ -339,17 +358,33 @@ class TDTestCase:
|
|||
tdSql.execute(sql)
|
||||
|
||||
# split vgroup
|
||||
self.splitVGroup(dbName)
|
||||
vgList = self.getVGroup(dbName)
|
||||
vgNum1 = len(vgList)
|
||||
vgNum2 = vgNum + 1
|
||||
if vgNum1 != vgNum2:
|
||||
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
|
||||
return
|
||||
self.expectSplitOk(dbName)
|
||||
|
||||
|
||||
# forbid
|
||||
def checkForbid(self):
|
||||
# stream
|
||||
tdLog.info("check forbid split having stream...")
|
||||
tdSql.execute("create database streamdb;")
|
||||
tdSql.execute("use streamdb;")
|
||||
tdSql.execute("create table ta(ts timestamp, age int);")
|
||||
tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);")
|
||||
self.expectSplitError("streamdb")
|
||||
tdSql.execute("drop stream ma;")
|
||||
self.expectSplitOk("streamdb")
|
||||
|
||||
# topic
|
||||
tdLog.info("check forbid split having topic...")
|
||||
tdSql.execute("create database topicdb wal_retention_period 10;")
|
||||
tdSql.execute("use topicdb;")
|
||||
tdSql.execute("create table ta(ts timestamp, age int);")
|
||||
tdSql.execute("create topic toa as select * from ta;")
|
||||
self.expectSplitError("topicdb")
|
||||
tdSql.execute("drop topic toa;")
|
||||
self.expectSplitOk("topicdb")
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
|
||||
# prepare env
|
||||
self.prepareEnv()
|
||||
|
||||
|
@ -360,12 +395,13 @@ class TDTestCase:
|
|||
|
||||
# check two db query result same
|
||||
self.checkResult()
|
||||
|
||||
tdLog.info(f"split vgroup i={i} passed.")
|
||||
|
||||
# split empty db
|
||||
self.splitEmptyDB()
|
||||
self.splitEmptyDB()
|
||||
|
||||
# check topic and stream forib
|
||||
self.checkForbid()
|
||||
|
||||
# stop
|
||||
def stop(self):
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# The option for wal_retetion_period and wal_retention_size is work well
|
||||
#
|
||||
|
||||
import taos
|
||||
from taos.tmq import Consumer
|
||||
|
||||
import os
|
||||
import sys
|
||||
import threading
|
||||
import json
|
||||
import time
|
||||
from datetime import date
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from os import path
|
||||
|
||||
|
||||
# consume topic
|
||||
def consume_topic(topic_name, consume_cnt, wait):
|
||||
print("start consume...")
|
||||
consumer = Consumer(
|
||||
{
|
||||
"group.id": "tg2",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"enable.auto.commit": "true",
|
||||
}
|
||||
)
|
||||
print("start subscrite...")
|
||||
consumer.subscribe([topic_name])
|
||||
|
||||
cnt = 0
|
||||
try:
|
||||
while True and cnt < consume_cnt:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
if wait:
|
||||
continue
|
||||
else:
|
||||
break
|
||||
err = res.error()
|
||||
if err is not None:
|
||||
raise err
|
||||
val = res.value()
|
||||
cnt += 1
|
||||
print(f" consume {cnt} ")
|
||||
for block in val:
|
||||
print(block.fetchall())
|
||||
finally:
|
||||
consumer.unsubscribe()
|
||||
consumer.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(sys.argv)
|
||||
if len(sys.argv) < 2:
|
||||
|
||||
print(" please input topic name for consume . -c for wait")
|
||||
else:
|
||||
wait = False
|
||||
if "-c" == sys.argv[1]:
|
||||
wait = True
|
||||
topic = sys.argv[2]
|
||||
else:
|
||||
topic = sys.argv[1]
|
||||
|
||||
print(f' wait={wait} topic={topic}')
|
||||
consume_topic(topic, 10000000, wait)
|
|
@ -0,0 +1,84 @@
|
|||
import time
|
||||
import os
|
||||
import subprocess
|
||||
import random
|
||||
import platform
|
||||
|
||||
class dnode():
|
||||
def __init__(self, pid, path):
|
||||
self.pid = pid
|
||||
self.path = path
|
||||
|
||||
# run exePath no wait finished
|
||||
def runNoWait(exePath):
|
||||
if platform.system().lower() == 'windows':
|
||||
cmd = f"mintty -h never {exePath}"
|
||||
else:
|
||||
cmd = f"nohup {exePath} > /dev/null 2>&1 & "
|
||||
|
||||
if os.system(cmd) != 0:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
# get online dnodes
|
||||
def getDnodes():
|
||||
cmd = "ps aux | grep taosd | awk '{{print $2,$11,$12,$13}}'"
|
||||
result = os.system(cmd)
|
||||
result=subprocess.check_output(cmd,shell=True)
|
||||
strout = result.decode('utf-8').split("\n")
|
||||
dnodes = []
|
||||
|
||||
for line in strout:
|
||||
cols = line.split(' ')
|
||||
if len(cols) != 4:
|
||||
continue
|
||||
exepath = cols[1]
|
||||
if len(exepath) < 5 :
|
||||
continue
|
||||
if exepath[-5:] != 'taosd':
|
||||
continue
|
||||
|
||||
# add to list
|
||||
path = cols[1] + " " + cols[2] + " " + cols[3]
|
||||
dnodes.append(dnode(cols[0], path))
|
||||
|
||||
print(" show dnodes cnt=%d...\n"%(len(dnodes)))
|
||||
for dn in dnodes:
|
||||
print(f" pid={dn.pid} path={dn.path}")
|
||||
|
||||
return dnodes
|
||||
|
||||
def restartDnodes(dnodes, cnt, seconds):
|
||||
print(f"start dnode cnt={cnt} wait={seconds}s")
|
||||
selects = random.sample(dnodes, cnt)
|
||||
for select in selects:
|
||||
print(f" kill -9 {select.pid}")
|
||||
cmd = f"kill -9 {select.pid}"
|
||||
os.system(cmd)
|
||||
print(f" restart {select.path}")
|
||||
if runNoWait(select.path) == False:
|
||||
print(f"run {select.path} failed.")
|
||||
raise Exception("exe failed.")
|
||||
print(f" sleep {seconds}s ...")
|
||||
time.sleep(seconds)
|
||||
|
||||
def run():
|
||||
# kill seconds interval
|
||||
killLoop = 10
|
||||
minKill = 1
|
||||
maxKill = 10
|
||||
for i in range(killLoop):
|
||||
dnodes = getDnodes()
|
||||
killCnt = 0
|
||||
if len(dnodes) > 0:
|
||||
killCnt = random.randint(1, len(dnodes))
|
||||
restartDnodes(dnodes, killCnt, random.randint(1, 5))
|
||||
|
||||
seconds = random.randint(minKill, maxKill)
|
||||
print(f"----------- kill loop i={i} killCnt={killCnt} done. do sleep {seconds}s ... \n")
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
run()
|
Loading…
Reference in New Issue