Merge pull request #28422 from taosdata/feat/TD-32441-3.0

fix: subprocess.popen redirect to PIPE , pipe buffer while fill full …
This commit is contained in:
Alex Duan 2024-10-20 20:38:24 +08:00 committed by GitHub
commit 7d81dad31f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 100 additions and 93 deletions

View File

@ -8,15 +8,14 @@
"confirm_parameter_prompt": "no",
"continue_if_fail": "yes",
"databases": "dbrate",
"query_times": 20,
"query_times": 5,
"query_mode": "taosc",
"specified_table_query": {
"query_interval": 0,
"concurrent": 10,
"threads": 10,
"sqls": [
{
"sql": "select count(*) from meters",
"result": "./query_result.txt"
"sql": "select count(*) from meters"
}
]
}

View File

@ -17,7 +17,9 @@
"dbinfo": {
"name": "dbrate",
"vgroups": 1,
"drop": "yes"
"drop": "yes",
"wal_retention_size": 1,
"wal_retention_period": 1
},
"super_tables": [
{
@ -27,7 +29,7 @@
"childtable_prefix": "d",
"insert_mode": "@STMT_MODE",
"interlace_rows": @INTERLACE_MODE,
"insert_rows": 100000,
"insert_rows": 10000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"auto_create_table": "no",

View File

@ -34,28 +34,6 @@ def exec(command, show=True):
print(f"exec {command}\n")
return os.system(command)
# run return output and error
def run(command, timeout = 60, show=True):
if(show):
print(f"run {command} timeout={timeout}s\n")
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait(timeout)
output = process.stdout.read().decode(encoding="gbk")
error = process.stderr.read().decode(encoding="gbk")
return output, error
# return list after run
def runRetList(command, timeout=10, first=True):
output,error = run(command, timeout)
if first:
return output.splitlines()
else:
return error.splitlines()
def readFileContext(filename):
file = open(filename)
context = file.read()
@ -78,6 +56,27 @@ def appendFileContext(filename, context):
except:
print(f"appand file error context={context} .")
# run return output and error
def run(command, show=True):
# out to file
out = "out.txt"
err = "err.txt"
ret = exec(command + f" 1>{out} 2>{err}", True)
# read from file
output = readFileContext(out)
error = readFileContext(err)
return output, error
# return list after run
def runRetList(command, first=True):
output,error = run(command)
if first:
return output.splitlines()
else:
return error.splitlines()
def getFolderSize(folder):
total_size = 0
for dirpath, dirnames, filenames in os.walk(folder):
@ -134,8 +133,6 @@ def getMatch(datatype, algo):
def generateJsonFile(stmt, interlace):
print(f"doTest stmt: {stmt} interlace_rows={interlace}\n")
# replace datatype
context = readFileContext(templateFile)
# replace compress
@ -204,9 +201,16 @@ def writeTemplateInfo(resultFile):
insertRows = findContextValue(context, "insert_rows")
bindVGroup = findContextValue(context, "thread_bind_vgroup")
nThread = findContextValue(context, "thread_count")
batch = findContextValue(context, "num_of_records_per_req")
if bindVGroup.lower().find("yes") != -1:
nThread = vgroups
line = f"thread_bind_vgroup = {bindVGroup}\nvgroups = {vgroups}\nchildtable_count = {childCount}\ninsert_rows = {insertRows}\ninsertThreads = {nThread} \n\n"
line = f"thread_bind_vgroup = {bindVGroup}\n"
line += f"vgroups = {vgroups}\n"
line += f"childtable_count = {childCount}\n"
line += f"insert_rows = {insertRows}\n"
line += f"insertThreads = {nThread}\n"
line += f"batchSize = {batch}\n\n"
print(line)
appendFileContext(resultFile, line)
@ -247,14 +251,8 @@ def totalCompressRate(stmt, interlace, resultFile, spent, spentReal, writeSpeed,
# %("No", "stmtMode", "interlaceRows", "spent", "spent-real", "writeSpeed", "write-real", "query-QPS", "dataSize", "rate")
Number += 1
'''
context = "%2s %6s %10s %10s %10s %15s %15s %16s %16s %16s %16s %16s %8s %8s %8s\n"%(
Number, stmt, interlace, spent + "s", spentReal + "s", writeSpeed + " rows/s", writeReal + " rows/s",
min, avg, p90, p99, max,
querySpeed, str(totalSize) + " MB", rate + "%")
'''
context = "%2s %8s %10s %10s %16s %16s %12s %12s %12s %12s %12s %12s %10s %10s %10s\n"%(
Number, stmt, interlace, spent + "s", spentReal + "s", writeSpeed + "r/s", writeReal + "r/s",
Number, stmt, interlace, spent + "s", spentReal + "s", writeSpeed + " r/s", writeReal + " r/s",
min, avg, p90, p99, max + "ms",
querySpeed, str(totalSize) + " MB", rate + "%")
@ -323,7 +321,7 @@ def testWrite(jsonFile):
def testQuery():
command = f"taosBenchmark -f json/query.json"
lines = runRetList(command, 60000)
lines = runRetList(command)
# INFO: Spend 6.7350 second completed total queries: 10, the QPS of all threads: 1.485
speed = None

View File

@ -8,15 +8,14 @@
"confirm_parameter_prompt": "no",
"continue_if_fail": "yes",
"databases": "dbrate",
"query_times": 20,
"query_times": 5,
"query_mode": "taosc",
"specified_table_query": {
"query_interval": 0,
"concurrent": 10,
"threads": 10,
"sqls": [
{
"sql": "select * from meters",
"result": "./query_res0.txt"
"sql": "select * from meters"
}
]
}

View File

@ -34,28 +34,6 @@ def exec(command, show=True):
print(f"exec {command}\n")
return os.system(command)
# run return output and error
def run(command, timeout = 60, show=True):
if(show):
print(f"run {command} timeout={timeout}s\n")
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait(timeout)
output = process.stdout.read().decode(encoding="gbk")
error = process.stderr.read().decode(encoding="gbk")
return output, error
# return list after run
def runRetList(command, timeout=10, first=True):
output,error = run(command, timeout)
if first:
return output.splitlines()
else:
return error.splitlines()
def readFileContext(filename):
file = open(filename)
context = file.read()
@ -78,6 +56,27 @@ def appendFileContext(filename, context):
except:
print(f"appand file error context={context} .")
# run return output and error
def run(command, show=True):
# out to file
out = "out.txt"
err = "err.txt"
ret = exec(command + f" 1>{out} 2>{err}", True)
# read from file
output = readFileContext(out)
error = readFileContext(err)
return output, error
# return list after run
def runRetList(command, first=True):
output,error = run(command)
if first:
return output.splitlines()
else:
return error.splitlines()
def getFolderSize(folder):
total_size = 0
for dirpath, dirnames, filenames in os.walk(folder):
@ -196,48 +195,55 @@ def findContextValue(context, label):
def writeTemplateInfo(resultFile):
# create info
context = readFileContext(templateFile)
dbname = findContextValue(context, "name")
vgroups = findContextValue(context, "vgroups")
childCount = findContextValue(context, "childtable_count")
insertRows = findContextValue(context, "insert_rows")
line = f"vgroups = {vgroups}\nchildtable_count = {childCount}\ninsert_rows = {insertRows}\n\n"
print(line)
appendFileContext(resultFile, line)
return dbname
def totalCompressRate(algo, resultFile, writeSpeed, querySpeed):
global Number
# flush
command = 'taos -s "flush database dbrate;"'
rets = exec(command)
command = 'taos -s "compact database dbrate;"'
rets = exec(command)
waitCompactFinish(60)
loop = 30
# read compress rate
command = 'taos -s "show table distributed dbrate.meters\G;"'
rets = runRetList(command)
print(rets)
str1 = rets[5]
arr = str1.split(" ")
while loop > 0:
loop -= 1
# Total_Size KB
str2 = arr[2]
pos = str2.find("=[")
totalSize = int(float(str2[pos+2:])/1024)
# flush database
command = 'taos -s "flush database dbrate;"'
exec(command)
time.sleep(1)
# Compression_Ratio
str2 = arr[6]
pos = str2.find("=[")
rate = str2[pos+2:]
print("rate =" + rate)
# read compress rate
command = 'taos -s "show table distributed dbrate.meters\G;"'
rets = runRetList(command)
print(rets)
# total data file size
#dataSize = getFolderSize(f"{dataDir}/vnode/")
#dataSizeMB = int(dataSize/1024/1024)
str1 = rets[5]
arr = str1.split(" ")
# appand to file
# Total_Size KB
str2 = arr[2]
pos = str2.find("=[")
totalSize = int(float(str2[pos+2:])/1024)
# Compression_Ratio
str2 = arr[6]
pos = str2.find("=[")
rate = str2[pos+2:]
print("rate =" + rate)
if rate != "0.00":
break
# total data file size
#dataSize = getFolderSize(f"{dataDir}/vnode/")
#dataSizeMB = int(dataSize/1024/1024)
# appand to file
Number += 1
context = "%10s %10s %10s %10s %30s %15s\n"%( Number, algo, str(totalSize)+" MB", rate+"%", writeSpeed + " Records/second", querySpeed)
showLog(context)
@ -269,11 +275,15 @@ def testWrite(jsonFile):
speed = context[pos: end]
#print(f"write pos ={pos} end={end} speed={speed}\n output={context} \n")
# flush database
command = 'taos -s "flush database dbrate;"'
exec(command)
return speed
def testQuery():
command = f"taosBenchmark -f json/query.json"
lines = runRetList(command, 60000)
lines = runRetList(command)
# INFO: Spend 6.7350 second completed total queries: 10, the QPS of all threads: 1.485
speed = None
@ -296,7 +306,6 @@ def testQuery():
def doTest(algo, resultFile):
print(f"doTest algo: {algo} \n")
#cleanAndStartTaosd()
# json
jsonFile = generateJsonFile(algo)