From 2e2a3f10e18eb9955c9d5f9b95e6260eeb283868 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Mon, 8 Jun 2020 23:58:09 -0700 Subject: [PATCH 01/26] Added the -l option to crash_gen tool, writing larger amount of data --- tests/pytest/crash_gen.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index c88683aa09..0006247fc9 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -42,7 +42,10 @@ import crash_gen import taos # Global variables, tried to keep a small number. -gConfig = None # Command-line/Environment Configurations, will set a bit later + +# Command-line/Environment Configurations, will set a bit later +# ConfigNameSpace = argparse.Namespace +gConfig = argparse.Namespace() # Dummy value, will be replaced later logger = None def runThread(wt: WorkerThread): @@ -1171,8 +1174,8 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - for i in range(10): # 0 to 9 - for j in range(10) : + for i in range(35 if gConfig.larger_data else 2): # number of regular tables in the super table + for j in range(100 if gConfig.larger_data else 2) : # number of records per table sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( i, ds.getFixedSuperTableName(), @@ -1301,10 +1304,12 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', - help='Use a single shared db connection (default: false)') parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-l', '--larger-data', action='store_true', + help='Write larger amount of data during write operations (default: false)') + parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + help='Use a single shared db connection (default: false)') parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, help='Maximum number of steps to run (default: 100)') parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, From ac1fb970d5f1d6599f1d9a92752a40e18eb11aa3 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 9 Jun 2020 21:30:23 -0700 Subject: [PATCH 02/26] Added overlaping data insertion, plus occasional db connection drops --- tests/pytest/crash_gen.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 0006247fc9..d300a277e7 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -32,6 +32,7 @@ import textwrap from typing import List from typing import Dict +from typing import Set from util.log import * from util.dnodes import * @@ -969,7 +970,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err + self._err = err except: self.logDebug("[=] Unexpected exception") raise @@ -1136,10 +1137,14 @@ class ReadFixedDataTask(StateTransitionTask): sTbName = self._dbState.getFixedSuperTableName() dbc = wt.getDbConn() dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later - rTables = dbc.getQueryResult() - # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations + dbc.close() + dbc.open() + else: + rTables = dbc.getQueryResult() + # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + for rTbName in rTables : # regular tables + dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure # tdSql.query(" cars where tbname in ('carzero', 'carone')") @@ -1160,6 +1165,8 @@ class DropFixedSuperTableTask(StateTransitionTask): wt.execSql("drop table db.{}".format(tblName)) class AddFixedDataTask(StateTransitionTask): + activeTable : Set[int] = set() # Track which table is being actively worked on + @classmethod def getInfo(cls): return [ @@ -1174,14 +1181,24 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - for i in range(35 if gConfig.larger_data else 2): # number of regular tables in the super table - for j in range(100 if gConfig.larger_data else 2) : # number of records per table + tblSeq = list(range(35 if gConfig.larger_data else 2)) + random.shuffle(tblSeq) + for i in tblSeq: + if ( i in self.activeTable ): # wow already active + # logger.info("Concurrent data insertion into table: {}".format(i)) + # print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table + print("x", end="", flush=True) + else: + self.activeTable.add(i) # marking it active + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + for j in range(50 if gConfig.larger_data else 2) : # number of records per table sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( i, ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), ds.getNextTick(), ds.getNextInt()) wt.execSql(sql) + self.activeTable.discard(i) # not raising an error, unlike remove #---------- Non State-Transition Related Tasks ----------# From 511953665d4e95efc9236042049d0e404d328579 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Tue, 9 Jun 2020 23:40:10 -0700 Subject: [PATCH 03/26] Removed crash_gen check conditions not suited for larger (10-thread) parallel processing tests --- tests/pytest/crash_gen.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index d300a277e7..9ae3b4bcbb 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -618,15 +618,15 @@ class StateDbOnly(AnyState): self.assertIfExistThenSuccess(tasks, DropDbTask) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything - self.assertNoTask(tasks, DropDbTask) # should have have tried + # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail # self._state = self.STATE_TABLE_ONLY @@ -686,9 +686,10 @@ class StateHasData(AnyState): self.assertNoTask(tasks, DropFixedSuperTableTask) self.assertNoTask(tasks, AddFixedDataTask) # self.hasSuccess(tasks, DeleteDataTasks) - else: + else: # should be STATE_HAS_DATA self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) + if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table + self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) From c6b71abc468635a93b19fddad02d68dfa6fc6af1 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 10 Jun 2020 00:11:50 -0700 Subject: [PATCH 04/26] Minor tweak of crash_gen script --- tests/pytest/crash_gen.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 9ae3b4bcbb..486edb1ba0 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -675,7 +675,8 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): self.hasSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + if ( not self.hasTask(tasks, CreateDbTask) ) : + self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task self.assertNoTask(tasks, DropDbTask) # we must have drop_db task From 7dd6e6a442c8d33f2c722a6920e83a67a9c02f1d Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 11 Jun 2020 18:50:49 -0700 Subject: [PATCH 05/26] removed a check condition from crash_gen --- tests/pytest/crash_gen.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 486edb1ba0..7fe321a3eb 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -690,8 +690,8 @@ class StateHasData(AnyState): else: # should be STATE_HAS_DATA self.assertNoTask(tasks, DropDbTask) if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table - self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it - self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) + self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it + # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) # State of the database as we believe it to be From 815cb83904b0b543388c7c3f66be9187f77c28b3 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 12 Jun 2020 23:56:53 -0700 Subject: [PATCH 06/26] Added -r option for crash_gen.sh, recording data opersions, for power-off tests --- tests/pytest/crash_gen.py | 42 ++++++++++++++++++++++++++++++++++----- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index 7fe321a3eb..e933a865d0 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import os import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -1168,6 +1169,24 @@ class DropFixedSuperTableTask(StateTransitionTask): class AddFixedDataTask(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + + # We use these two files to record operations to DB, useful for power-off tests + fAddLogReady = None + fAddLogDone = None + + @classmethod + def prepToRecordOps(cls): + if gConfig.record_ops : + if ( cls.fAddLogReady == None ): + logger.info("Recording in a file operations to be performed...") + cls.fAddLogReady = open("add_log_ready.txt", "w") + if ( cls.fAddLogDone == None ): + logger.info("Recording in a file operations completed...") + cls.fAddLogDone = open("add_log_done.txt", "w") @classmethod def getInfo(cls): @@ -1183,7 +1202,7 @@ class AddFixedDataTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): ds = self._dbState wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - tblSeq = list(range(35 if gConfig.larger_data else 2)) + tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) for i in tblSeq: if ( i in self.activeTable ): # wow already active @@ -1193,13 +1212,24 @@ class AddFixedDataTask(StateTransitionTask): else: self.activeTable.add(i) # marking it active # No need to shuffle data sequence, unless later we decide to do non-increment insertion - for j in range(50 if gConfig.larger_data else 2) : # number of records per table - sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( - i, + for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table + nextInt = ds.getNextInt() + regTableName = "db.reg_table_{}".format(i) + if gConfig.record_ops: + self.prepToRecordOps() + self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) + self.fAddLogReady.flush() + os.fsync(self.fAddLogReady) + sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( + regTableName, ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), - ds.getNextTick(), ds.getNextInt()) + ds.getNextTick(), nextInt) wt.execSql(sql) + if gConfig.record_ops: + self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.flush() + os.fsync(self.fAddLogDone) self.activeTable.discard(i) # not raising an error, unlike remove @@ -1329,6 +1359,8 @@ def main(): help='Write larger amount of data during write operations (default: false)') parser.add_argument('-p', '--per-thread-db-connection', action='store_true', help='Use a single shared db connection (default: false)') + parser.add_argument('-r', '--record-ops', action='store_true', + help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, help='Maximum number of steps to run (default: 100)') parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, From 19302ef8a344dabe86d46ae0a4fc1b3dd42f4913 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 25 Jun 2020 11:47:31 +0800 Subject: [PATCH 07/26] test hash performance --- tests/test/c/CMakeLists.txt | 15 ++-- tests/test/c/hashPerformance.c | 131 +++++++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+), 6 deletions(-) create mode 100644 tests/test/c/hashPerformance.c diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index d40db5ee40..3b09342867 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -7,15 +7,18 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) - add_executable(insertPerTable insertPerTable.c) - target_link_libraries(insertPerTable taos_static pthread) + #add_executable(insertPerTable insertPerTable.c) + #target_link_libraries(insertPerTable taos_static pthread) add_executable(insertPerRow insertPerRow.c) target_link_libraries(insertPerRow taos_static pthread) - add_executable(importOneRow importOneRow.c) - target_link_libraries(importOneRow taos_static pthread) + #add_executable(importOneRow importOneRow.c) + #target_link_libraries(importOneRow taos_static pthread) - add_executable(importPerTable importPerTable.c) - target_link_libraries(importPerTable taos_static pthread) + #add_executable(importPerTable importPerTable.c) + #target_link_libraries(importPerTable taos_static pthread) + + add_executable(hashPerformance hashPerformance.c) + target_link_libraries(hashPerformance taos_static tutil common pthread) ENDIF() diff --git a/tests/test/c/hashPerformance.c b/tests/test/c/hashPerformance.c new file mode 100644 index 0000000000..fa3612d6e8 --- /dev/null +++ b/tests/test/c/hashPerformance.c @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tulog.h" +#include "ttime.h" +#include "tutil.h" +#include "hash.h" + +#define MAX_RANDOM_POINTS 20000 +#define GREEN "\033[1;32m" +#define NC "\033[0m" + +int32_t capacity = 100000; +int32_t q1Times = 1; +int32_t q2Times = 1; +int32_t keyNum = 100000; +int32_t printInterval = 10000; + +typedef struct HashTestRow { + int32_t size; + void * ptr; +} HashTestRow; + +void shellParseArgument(int argc, char *argv[]); + +void testHashPerformance() { + int64_t initialMs = taosGetTimestampMs(); + _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + void * hashHandle = taosHashInit(capacity, hashFp, true); + + int64_t startMs = taosGetTimestampMs(); + float seconds = (startMs - initialMs) / 1000.0; + pPrint("initial time %.2f sec", seconds); + + for (int32_t t = 1; t <= keyNum; ++t) { + HashTestRow row = {0}; + char key[100] = {0}; + int32_t keySize = sprintf(key, "0.db.st%d", t); + + for (int32_t q = 0; q < q1Times; q++) { + taosHashGet(hashHandle, &key, keySize); + } + + taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow)); + + for (int32_t q = 0; q < q2Times; q++) { + taosHashGet(hashHandle, &key, keySize); + } + + if (t % printInterval == 0) { + int64_t endMs = taosGetTimestampMs(); + int64_t hashSize = taosHashGetSize(hashHandle); + float seconds = (endMs - startMs) / 1000.0; + float speed = printInterval / seconds; + pPrint("time:%.2f sec, speed:%.1f rows/second, hashSize:%ld", seconds, speed, hashSize); + startMs = endMs; + } + } + + int64_t endMs = taosGetTimestampMs(); + int64_t hashSize = taosHashGetSize(hashHandle); + seconds = (endMs - initialMs) / 1000.0; + float speed = hashSize / seconds; + + pPrint("total time:%.2f sec, avg speed:%.1f rows/second, hashSize:%ld", seconds, speed, hashSize); + taosHashCleanup(hashHandle); +} + +int main(int argc, char *argv[]) { + shellParseArgument(argc, argv); + testHashPerformance(); +} + +void printHelp() { + char indent[10] = " "; + printf("Used to test the performance of cache\n"); + + printf("%s%s\n", indent, "-k"); + printf("%s%s%s%d\n", indent, indent, "key num, default is ", keyNum); + printf("%s%s\n", indent, "-p"); + printf("%s%s%s%d\n", indent, indent, "print interval while put into hash, default is ", printInterval); + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%d\n", indent, indent, "the initial capacity of hash ", capacity); + printf("%s%s\n", indent, "-q1"); + printf("%s%s%s%d\n", indent, indent, "query times before put into hash", q1Times); + printf("%s%s\n", indent, "-q2"); + printf("%s%s%s%d\n", indent, indent, "query times after put into hash", q2Times); + + exit(EXIT_SUCCESS); +} + +void shellParseArgument(int argc, char *argv[]) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-k") == 0) { + keyNum = atoi(argv[++i]); + } else if (strcmp(argv[i], "-p") == 0) { + printInterval = atoi(argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + capacity = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q1") == 0) { + q1Times = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q2") == 0) { + q2Times = atoi(argv[++i]); + } else { + } + } + + pPrint("%s capacity:%d %s", GREEN, capacity, NC); + pPrint("%s printInterval:%d %s", GREEN, printInterval, NC); + pPrint("%s q1Times:%d %s", GREEN, q1Times, NC); + pPrint("%s q2Times:%d %s", GREEN, q2Times, NC); + pPrint("%s keyNum:%d %s", GREEN, keyNum, NC); +} From 5f979568e113fd83d4e3c28176d2985d7ae28e56 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Wed, 24 Jun 2020 21:59:13 -0700 Subject: [PATCH 08/26] Refactored and cleaned up crash_gen tool, ready to add sub-state transitions --- tests/pytest/crash_gen.py | 143 ++++++++++++++------------------------ 1 file changed, 52 insertions(+), 91 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index e933a865d0..bff2403bb8 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -69,7 +69,7 @@ class WorkerThread: # self._curStep = -1 self._pool = pool self._tid = tid - self._tc = tc + self._tc = tc # type: ThreadCoordinator # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() @@ -161,13 +161,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: - return self._tc.getDbState().getDbConn().execute(sql) + return self._tc.getDbManager().getDbConn().execute(sql) def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn else: - return self._tc.getDbState().getDbConn() + return self._tc.getDbManager().getDbConn() # def querySql(self, sql): # not "execute", since we are out side the DB context # if ( gConfig.per_thread_db_connection ): @@ -176,12 +176,12 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbState): + def __init__(self, pool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbState = dbState + self._dbManager = dbManager self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -191,8 +191,8 @@ class ThreadCoordinator: def getTaskExecutor(self): return self._te - def getDbState(self) -> DbState : - return self._dbState + def getDbManager(self) -> DbManager : + return self._dbManager def crossStepBarrier(self): self._stepBarrier.wait() @@ -216,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -289,8 +289,8 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbState().pickTaskType() # pick a task type for current state - return taskType(self.getDbState(), self._execStats) # create a task from it + taskType = self.getDbManager().pickTaskType() # pick a task type for current state + return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -301,16 +301,12 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. class ThreadPool: - def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + def __init__(self, numThreads, maxSteps): self.numThreads = numThreads self.maxSteps = maxSteps - self.funcSequencer = funcSequencer # Internal class variables - # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] - # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -324,7 +320,8 @@ class ThreadPool: logger.debug("Joining thread...") workerThread._thread.join() -# A queue of continguous POSITIVE integers +# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers +# for new table names class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element @@ -600,9 +597,9 @@ class StateEmpty(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB - if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks - self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers class StateDbOnly(AnyState): def getInfo(self): @@ -614,19 +611,19 @@ class StateDbOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( not self.hasTask(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more - self.assertIfExistThenSuccess(tasks, DropDbTask) + if ( not self.hasTask(tasks, TaskCreateDb) ): + self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more + self.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - if ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): - self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything + if ( not self.hasTask(tasks, TaskDropSuperTable) ): + self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail @@ -650,8 +647,8 @@ class StateSuperTableOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -675,28 +672,28 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): - self.hasSuccess(tasks, DropDbTask) - if ( not self.hasTask(tasks, CreateDbTask) ) : - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + self.hasSuccess(tasks, TaskDropDb) + if ( not self.hasTask(tasks, TaskCreateDb) ) : + self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only - if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task - self.assertNoTask(tasks, DropDbTask) # we must have drop_db task - self.hasSuccess(tasks, DropFixedSuperTableTask) + if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task + self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task + self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertNoTask(tasks, AddFixedDataTask) + self.assertNoTask(tasks, TaskDropDb) + self.assertNoTask(tasks, TaskDropSuperTable) + self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, DropDbTask) - if (not self.hasTask(tasks, CreateFixedSuperTableTask)) : # if we didn't create the table - self.assertNoTask(tasks, DropFixedSuperTableTask) # we should not have a task that drops it + self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table + self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) -# State of the database as we believe it to be -class DbState(): +# Manager of the Database Data/Connection +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() @@ -879,11 +876,11 @@ class DbState(): # Generic Checks, first based on the start state if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, CreateDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskCreateDb) # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, DropDbTask) + self._state.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop # if self._state.canCreateFixedTable(): @@ -930,8 +927,8 @@ class Task(): # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn - def __init__(self, dbState: DbState, execStats: ExecutionStats): - self._dbState = dbState + def __init__(self, dbManager: DbManager, execStats: ExecutionStats): + self._dbState = dbManager self._workerThread = None self._err = None self._curStep = None @@ -1075,7 +1072,7 @@ class StateTransitionTask(Task): -class CreateDbTask(StateTransitionTask): +class TaskCreateDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1090,7 +1087,7 @@ class CreateDbTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DropDbTask(StateTransitionTask): +class TaskDropDb(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1106,7 +1103,7 @@ class DropDbTask(StateTransitionTask): wt.execSql("drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) -class CreateFixedSuperTableTask(StateTransitionTask): +class TaskCreateSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1124,7 +1121,7 @@ class CreateFixedSuperTableTask(StateTransitionTask): # No need to create the regular tables, INSERT will do that automatically -class ReadFixedDataTask(StateTransitionTask): +class TaskReadData(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1151,7 +1148,7 @@ class ReadFixedDataTask(StateTransitionTask): # tdSql.query(" cars where tbname in ('carzero', 'carone')") -class DropFixedSuperTableTask(StateTransitionTask): +class TaskDropSuperTable(StateTransitionTask): @classmethod def getInfo(cls): return [ @@ -1167,7 +1164,7 @@ class DropFixedSuperTableTask(StateTransitionTask): tblName = self._dbState.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) -class AddFixedDataTask(StateTransitionTask): +class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on LARGE_NUMBER_OF_TABLES = 35 SMALL_NUMBER_OF_TABLES = 3 @@ -1233,42 +1230,6 @@ class AddFixedDataTask(StateTransitionTask): self.activeTable.discard(i) # not raising an error, unlike remove -#---------- Non State-Transition Related Tasks ----------# - -class CreateTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self._dbState.addTable() - self.logDebug("Creating a table {} ...".format(tIndex)) - wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - self.logDebug("Table {} created.".format(tIndex)) - self._dbState.releaseTable(tIndex) - -class DropTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tableName = self._dbState.getTableNameToDelete() - if ( not tableName ): # May be "False" - self.logInfo("Cannot generate a table to delete, skipping...") - return - self.logInfo("Dropping a table db.{} ...".format(tableName)) - wt.execSql("drop table db.{}".format(tableName)) - - - -class AddDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) - tIndex = ds.pickAndAllocateTable() - if ( tIndex == None ): - self.logInfo("No table found to add data, skipping...") - return - sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("[SQL] Executing SQL: {}".format(sql)) - wt.execSql(sql) - ds.releaseTable(tIndex) - self.logDebug("[OPS] Finished adding data") - - # Deterministic random number generator class Dice(): seeded = False # static, uninitialized @@ -1384,12 +1345,12 @@ def main(): # resetDb = False # DEBUG only # dbState = DbState(resetDb) # DBEUG only! - dbState = DbState() # Regular function + dbManager = DbManager() # Regular function Dice.seed(0) # initial seeding of dice tc = ThreadCoordinator( - ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), + ThreadPool(gConfig.num_threads, gConfig.max_steps), # WorkDispatcher(dbState), # Obsolete? - dbState + dbManager ) # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix @@ -1437,7 +1398,7 @@ def main(): tc.run() tc.logStats() - dbState.cleanUp() + dbManager.cleanUp() # logger.info("Crash_Gen execution finished") From 7c30fd22e158d34ce8b655272b2b83fa0a51cfc6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jun 2020 15:41:40 +0800 Subject: [PATCH 09/26] [td-719] fix bugs in affected rows --- src/client/src/tscParseInsert.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index df4ccca9bc..1db4108d22 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1351,6 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) { int32_t code = TSDB_CODE_SUCCESS; SSqlCmd *pCmd = &pSql->cmd; + pSql->res.numOfRows = 0; assert(pCmd->numOfClause == 1); STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; @@ -1394,6 +1395,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { fclose(fp); pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); return; } @@ -1458,8 +1460,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { free(line); if (count > 0) { - if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { + code = doPackSendDataBlock(pSql, count, pTableDataBlock); + if (code != TSDB_CODE_SUCCESS) { pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); + return; } } else { From c8b673558526838c54c3a0ff8eefcef1535c43c3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 25 Jun 2020 16:59:18 +0800 Subject: [PATCH 10/26] [TD-705] --- src/inc/taosdef.h | 2 +- src/mnode/src/mnodeSdb.c | 4 +- tests/test/c/CMakeLists.txt | 11 +- tests/test/c/createTablePerformance.c | 232 ++++++++++++++++++++++++++ 4 files changed, 242 insertions(+), 7 deletions(-) create mode 100644 tests/test/c/createTablePerformance.c diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index eefd9f0c00..ecf78edfd5 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -354,7 +354,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_DEFAULT_DBS_HASH_SIZE 100 #define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100 #define TSDB_DEFAULT_STABLES_HASH_SIZE 100 -#define TSDB_DEFAULT_CTABLES_HASH_SIZE 10000 +#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 #define TSDB_PORT_DNODESHELL 0 #define TSDB_PORT_DNODEDNODE 5 diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index b3dae1c5d4..a9d450aff0 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -250,7 +250,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); } -static int32_t sdbForwardToPeer(SWalHead *pHead) { + static int32_t sdbForwardToPeer(SWalHead *pHead) { if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); @@ -782,7 +782,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->restoredFp = pDesc->restoredFp; _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - if (pTable->keyType == SDB_KEY_STRING) { + if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); } pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true); diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index 3b09342867..4717a4f769 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -10,8 +10,8 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) #add_executable(insertPerTable insertPerTable.c) #target_link_libraries(insertPerTable taos_static pthread) - add_executable(insertPerRow insertPerRow.c) - target_link_libraries(insertPerRow taos_static pthread) + #add_executable(insertPerRow insertPerRow.c) + #target_link_libraries(insertPerRow taos_static pthread) #add_executable(importOneRow importOneRow.c) #target_link_libraries(importOneRow taos_static pthread) @@ -19,6 +19,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) #add_executable(importPerTable importPerTable.c) #target_link_libraries(importPerTable taos_static pthread) - add_executable(hashPerformance hashPerformance.c) - target_link_libraries(hashPerformance taos_static tutil common pthread) + #add_executable(hashPerformance hashPerformance.c) + #target_link_libraries(hashPerformance taos_static tutil common pthread) + + add_executable(createTablePerformance createTablePerformance.c) + target_link_libraries(createTablePerformance taos_static tutil common pthread) ENDIF() diff --git a/tests/test/c/createTablePerformance.c b/tests/test/c/createTablePerformance.c new file mode 100644 index 0000000000..4ab6f98423 --- /dev/null +++ b/tests/test/c/createTablePerformance.c @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tulog.h" +#include "ttime.h" +#include "tutil.h" +#include "tglobal.h" +#include "hash.h" + +#define MAX_RANDOM_POINTS 20000 +#define GREEN "\033[1;32m" +#define NC "\033[0m" + +char dbName[32] = "db"; +char stableName[64] = "st"; +int32_t numOfThreads = 30; +int32_t numOfTables = 100000; +int32_t maxTables = 5000; +int32_t numOfColumns = 2; + +typedef struct { + int32_t tableBeginIndex; + int32_t tableEndIndex; + int32_t threadIndex; + char dbName[32]; + char stableName[64]; + float createTableSpeed; + pthread_t thread; +} SThreadInfo; + +void shellParseArgument(int argc, char *argv[]); +void *threadFunc(void *param); +void createDbAndSTable(); + +int main(int argc, char *argv[]) { + shellParseArgument(argc, argv); + taos_init(); + createDbAndSTable(); + + pPrint("%d threads are spawned to create table", numOfThreads); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); + + int32_t numOfTablesPerThread = numOfTables / numOfThreads; + numOfTables = numOfTablesPerThread * numOfThreads; + for (int i = 0; i < numOfThreads; ++i) { + pInfo[i].tableBeginIndex = i * numOfTablesPerThread; + pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; + pInfo[i].threadIndex = i; + strcpy(pInfo[i].dbName, dbName); + strcpy(pInfo[i].stableName, stableName); + pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); + } + + taosMsleep(300); + for (int i = 0; i < numOfThreads; i++) { + pthread_join(pInfo[i].thread, NULL); + } + + float createTableSpeed = 0; + for (int i = 0; i < numOfThreads; ++i) { + createTableSpeed += pInfo[i].createTableSpeed; + } + + pPrint("%s total speed:%.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC); + + pthread_attr_destroy(&thattr); + free(pInfo); +} + +void createDbAndSTable() { + pPrint("start to create db and stable"); + char qstr[64000]; + + TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (con == NULL) { + pError("failed to connect to DB, reason:%s", taos_errstr(con)); + exit(1); + } + + sprintf(qstr, "create database if not exists %s maxtables %d", dbName, maxTables); + TAOS_RES *pSql = taos_query(con, qstr); + int32_t code = taos_errno(pSql); + if (code != 0) { + pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + + sprintf(qstr, "use %s", dbName); + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + + int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName); + for (int32_t f = 0; f < numOfColumns - 1; ++f) { + len += sprintf(qstr + len, ", f%d double", f); + } + sprintf(qstr + len, ") tags(t int)"); + + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + + taos_close(con); +} + +void *threadFunc(void *param) { + SThreadInfo *pInfo = (SThreadInfo *)param; + char qstr[65000]; + int code; + + TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (con == NULL) { + pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); + exit(1); + } + + sprintf(qstr, "use %s", pInfo->dbName); + TAOS_RES *pSql = taos_query(con, qstr); + taos_free_result(pSql); + + int64_t startMs = taosGetTimestampMs(); + + for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + sprintf(qstr, "create table if not exists %s%d using %s tags(%d)", stableName, t, stableName, t); + TAOS_RES *pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); + } + taos_free_result(pSql); + } + + float createTableSpeed = 0; + for (int i = 0; i < numOfThreads; ++i) { + createTableSpeed += pInfo[i].createTableSpeed; + } + + int64_t endMs = taosGetTimestampMs(); + int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; + float seconds = (endMs - startMs) / 1000.0; + float speed = totalTables / seconds; + pInfo->createTableSpeed = speed; + + pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed); + taos_close(con); + + return 0; +} + +void printHelp() { + char indent[10] = " "; + printf("Used to test the performance while create table\n"); + + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); + printf("%s%s\n", indent, "-d"); + printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", dbName); + printf("%s%s\n", indent, "-s"); + printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", stableName); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads); + printf("%s%s\n", indent, "-n"); + printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables); + printf("%s%s\n", indent, "-columns"); + printf("%s%s%s%d\n", indent, indent, "numOfColumns, default is ", numOfColumns); + printf("%s%s\n", indent, "-tables"); + printf("%s%s%s%d\n", indent, indent, "Database parameters tables, default is ", maxTables); + + exit(EXIT_SUCCESS); +} + +void shellParseArgument(int argc, char *argv[]) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-d") == 0) { + strcpy(dbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); + } else if (strcmp(argv[i], "-s") == 0) { + strcpy(stableName, argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0) { + numOfTables = atoi(argv[++i]); + } else if (strcmp(argv[i], "-tables") == 0) { + maxTables = atoi(argv[++i]); + } else if (strcmp(argv[i], "-columns") == 0) { + numOfColumns = atoi(argv[++i]); + } else { + } + } + + pPrint("%s dbName:%s %s", GREEN, dbName, NC); + pPrint("%s stableName:%s %s", GREEN, stableName, NC); + pPrint("%s configDir:%s %s", GREEN, configDir, NC); + pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC); + pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC); + pPrint("%s numOfColumns:%d %s", GREEN, numOfColumns, NC); + pPrint("%s dbPara maxTables:%d %s", GREEN, maxTables, NC); + + pPrint("%s start create table performace test %s", GREEN, NC); +} From 6d13cf37ef7b40c3343fc710d79aae0bf3ab9364 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 25 Jun 2020 13:15:45 +0000 Subject: [PATCH 11/26] failed to create table in taosdemo --- src/kit/taosdemo/taosdemo.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index e673277a0b..750fb2fd40 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -499,7 +499,7 @@ int main(int argc, char *argv[]) { /* Create all the tables; */ printf("Creating %d table(s)......\n", ntables); for (int i = 0; i < ntables; i++) { - snprintf(command, BUFFER_SIZE, "create table %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols); queryDB(taos, command); } @@ -509,7 +509,7 @@ int main(int argc, char *argv[]) { } else { /* Create metric table */ printf("Creating meters super table...\n"); - snprintf(command, BUFFER_SIZE, "create table %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); queryDB(taos, command); printf("meters created!\n"); @@ -523,9 +523,9 @@ int main(int argc, char *argv[]) { j = i % 10; } if (j % 2 == 0) { - snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai"); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai"); } else { - snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing"); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing"); } queryDB(taos, command); } From d0b208755d162963cfef14562bbba5892ee8030c Mon Sep 17 00:00:00 2001 From: dengyihao Date: Thu, 25 Jun 2020 21:27:26 +0800 Subject: [PATCH 12/26] coverity issue --- src/query/src/qExecutor.c | 58 +++++++++++++++++++++++-------------- src/query/src/qpercentile.c | 7 +++-- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d41bac2a49..ed529db92e 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -767,6 +767,9 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { + if (pDataBlock == NULL) { + return NULL; + } char *dataBlock = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -854,6 +857,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) { + tfree(sasArray); return; } @@ -1349,10 +1353,13 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { // the column may be the normal column, group by normal_column, the functionId is TSDB_FUNC_PRJ } } - - p->tagInfo.pTagCtxList = pTagCtx; - p->tagInfo.numOfTagCols = num; - p->tagInfo.tagsLen = tagLen; + if (p != NULL) { + p->tagInfo.pTagCtxList = pTagCtx; + p->tagInfo.numOfTagCols = num; + p->tagInfo.tagsLen = tagLen; + } else { + tfree(pTagCtx); + } } } @@ -3497,7 +3504,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde continue; } - assert(result[i].numOfRows >= 0 && pQInfo->offset <= 1); + assert(pQInfo->offset <= 1); int32_t numOfRowsToCopy = result[i].numOfRows - pQInfo->offset; int32_t oldOffset = pQInfo->offset; @@ -5295,9 +5302,9 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * bytes = s.bytes; } else{ int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); - assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags || j == TSDB_TBNAME_COLUMN_INDEX); + assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags); - if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) { + if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; type = pCol->type; bytes = pCol->bytes; @@ -5339,8 +5346,6 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * assert(ret == TSDB_CODE_SUCCESS); } } - - tfree(pExprMsg); *pExprInfo = pExprs; return TSDB_CODE_SUCCESS; @@ -5591,11 +5596,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQInfo->signature = pQInfo; pQInfo->tableGroupInfo = *pTableGroupInfo; - size_t numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); + size_t numOfGroups = 0; + if (pTableGroupInfo->pGroupList != NULL) { + numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); + + pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); + pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; + } - pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); - pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; - int tableIndex = 0; STimeWindow window = pQueryMsg->window; taosArraySort(pTableIdList, compareTableIdInfo); @@ -5693,7 +5701,8 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); tsBufResetPos(pTSBuf); - tsBufNextPos(pTSBuf); + bool ret = tsBufNextPos(pTSBuf); + UNUSED(ret); } // only the successful complete requries the sem_post/over = 1 operations. @@ -5839,18 +5848,23 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // make sure file exist if (FD_VALID(fd)) { - size_t s = lseek(fd, 0, SEEK_END); - qTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); - - lseek(fd, 0, SEEK_SET); - read(fd, data, s); + int32_t s = lseek(fd, 0, SEEK_END); + UNUSED(s); + qTrace("QInfo:%p ts comp data return, file:%s, size:%d", pQInfo, pQuery->sdata[0]->data, s); + s = lseek(fd, 0, SEEK_SET); + if (s >= 0) { + size_t sz = read(fd, data, s); + UNUSED(sz); + } close(fd); - unlink(pQuery->sdata[0]->data); } else { - // todo return the error code to client + // todo return the error code to client and handle invalid fd qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); + if (fd != -1) { + close(fd); + } } // all data returned, set query over @@ -5903,7 +5917,6 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi } if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { - free(pExprMsg); goto _over; } @@ -5975,6 +5988,7 @@ _over: } free(pTagColumnInfo); free(pExprs); + free(pExprMsg); taosArrayDestroy(pTableIdList); //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; diff --git a/src/query/src/qpercentile.c b/src/query/src/qpercentile.c index e192cf3873..9de4d3668c 100644 --- a/src/query/src/qpercentile.c +++ b/src/query/src/qpercentile.c @@ -880,8 +880,11 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - UNUSED(sz); - tMemBucketPut(pMemBucket, pPage->data, pPage->num); + if (sz != pMemBuffer->pageSize) { + uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path); + } else { + tMemBucketPut(pMemBucket, pPage->data, pPage->num); + } } fclose(pMemBuffer->file); From 103f6888a33c650017501fa3ad7d28bb33b20ccf Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 25 Jun 2020 13:27:55 +0000 Subject: [PATCH 13/26] scripts --- tests/script/general/parser/auto_create_tb.sim | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/script/general/parser/auto_create_tb.sim b/tests/script/general/parser/auto_create_tb.sim index 6065daa6d3..64fec4b56d 100644 --- a/tests/script/general/parser/auto_create_tb.sim +++ b/tests/script/general/parser/auto_create_tb.sim @@ -153,13 +153,13 @@ print $rows $data00 $data10 $data20 if $rows != 3 then return -1 endi -if $data00 != tb3 then +if $data00 != tb1 then return -1 endi if $data10 != tb2 then return -1 endi -if $data20 != tb1 then +if $data20 != tb3 then return -1 endi @@ -221,13 +221,13 @@ sql show tables if $rows != 3 then return -1 endi -if $data00 != tb3 then +if $data00 != tb1 then return -1 endi if $data10 != tb2 then return -1 endi -if $data20 != tb1 then +if $data20 != tb3 then return -1 endi From 03ff8f6519ebdc3f0a6becab208ceaf2b12defd8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 25 Jun 2020 23:45:24 +0800 Subject: [PATCH 14/26] [td-225] improve the data generation performance. --- src/kit/taosdemo/taosdemo.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index e673277a0b..fa7c4fb7af 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -847,10 +847,10 @@ void *syncWrite(void *sarg) { pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, tID); int k; for (k = 0; k < winfo->nrecords_per_request;) { - int rand_num = trand() % 100; + int rand_num = rand() % 100; int len = -1; if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) { - long d = tmp_time - trand() % 1000000 + rand_num; + long d = tmp_time - rand() % 1000000 + rand_num; len = generateData(data, data_type, ncols_per_record, d, len_of_binary); } else { len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary); @@ -942,10 +942,10 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name); for (int i = 0; i < tb_info->nrecords_per_request; i++) { - int rand_num = trand() % 100; + int rand_num = rand() % 100; if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate) { - long d = tmp_time - trand() % 1000000 + rand_num; + long d = tmp_time - rand() % 1000000 + rand_num; generateData(data, datatype, ncols_per_record, d, len_of_binary); } else { @@ -994,20 +994,20 @@ int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t times for (int i = 0; i < num_of_cols; i++) { if (strcasecmp(data_type[i % c], "tinyint") == 0) { - pstr += sprintf(pstr, ", %d", (int)(trand() % 128)); + pstr += sprintf(pstr, ", %d", (int)(rand() % 128)); } else if (strcasecmp(data_type[i % c], "smallint") == 0) { - pstr += sprintf(pstr, ", %d", (int)(trand() % 32767)); + pstr += sprintf(pstr, ", %d", (int)(rand() % 32767)); } else if (strcasecmp(data_type[i % c], "int") == 0) { - pstr += sprintf(pstr, ", %d", (int)(trand() % 10)); + pstr += sprintf(pstr, ", %d", (int)(rand() % 10)); } else if (strcasecmp(data_type[i % c], "bigint") == 0) { - pstr += sprintf(pstr, ", %" PRId64, trand() % 2147483648); + pstr += sprintf(pstr, ", %" PRId64, rand() % 2147483648); } else if (strcasecmp(data_type[i % c], "float") == 0) { - pstr += sprintf(pstr, ", %10.4f", (float)(trand() / 1000.0)); + pstr += sprintf(pstr, ", %10.4f", (float)(rand() / 1000.0)); } else if (strcasecmp(data_type[i % c], "double") == 0) { - double t = (double)(trand() / 1000000.0); + double t = (double)(rand() / 1000000.0); pstr += sprintf(pstr, ", %20.8f", t); } else if (strcasecmp(data_type[i % c], "bool") == 0) { - bool b = trand() & 1; + bool b = rand() & 1; pstr += sprintf(pstr, ", %s", b ? "true" : "false"); } else if (strcasecmp(data_type[i % c], "binary") == 0) { char s[len_of_binary]; @@ -1033,7 +1033,7 @@ void rand_string(char *str, int size) { --size; int n; for (n = 0; n < size; n++) { - int key = trand() % (int)(sizeof charset - 1); + int key = rand() % (int)(sizeof charset - 1); str[n] = charset[key]; } str[n] = 0; From 23ed5e24b0a9c2dd6a07d8f26bce0149587c42d8 Mon Sep 17 00:00:00 2001 From: Steven Li Date: Thu, 25 Jun 2020 22:17:18 -0700 Subject: [PATCH 15/26] Refactored crash_gen, extracted logic to create StateMachine class --- tests/pytest/crash_gen.py | 340 +++++++++++++++++++------------------- 1 file changed, 167 insertions(+), 173 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index bff2403bb8..cc41fd5e7d 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -216,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbManager.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -289,7 +289,7 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbManager().pickTaskType() # pick a task type for current state + taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): @@ -691,18 +691,120 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) +class StateMechine : + def __init__(self, dbConn): + self._dbConn = dbConn + self._curState = self._findCurrentState() # starting state + self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. + + def getCurrentState(self): + return self._curState + + # May be slow, use cautionsly... + def getTaskTypes(self): # those that can run (directly/indirectly) from the current state + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks + firstTaskTypes = [] + for tc in allTaskClasses: + # t = tc(self) # create task object + if tc.canBeginFrom(self._curState): + firstTaskTypes.append(tc) + # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones + taskTypes = firstTaskTypes.copy() # have to have these + for task1 in firstTaskTypes: # each task type gathered so far + endState = task1.getEndState() # figure the end state + if endState == None: # does not change end state + continue # no use, do nothing + for tc in allTaskClasses: # what task can further begin from there? + if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): + taskTypes.append(tc) # gather it + + if len(taskTypes) <= 0: + raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) + return taskTypes + + def _findCurrentState(self): + dbc = self._dbConn + ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state + if dbc.query("show databases") == 0 : # no database?! + # logger.debug("Found EMPTY state") + logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) + return StateEmpty() + dbc.execute("use db") # did not do this when openning connection + if dbc.query("show tables") == 0 : # no tables + # logger.debug("Found DB ONLY state") + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) + return StateDbOnly() + if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables + # logger.debug("Found TABLE_ONLY state") + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + return StateSuperTableOnly() + else: # has actual tables + # logger.debug("Found HAS_DATA state") + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) + return StateHasData() + + def transition(self, tasks): + if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + return # do nothing + + self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps + + # Generic Checks, first based on the start state + if self._curState.canCreateDb(): + self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb) + # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops + + if self._curState.canDropDb(): + self._curState.assertIfExistThenSuccess(tasks, TaskDropDb) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop + + # if self._state.canCreateFixedTable(): + # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create + + # if self._state.canDropFixedTable(): + # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped + # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop + + # if self._state.canAddData(): + # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually + + # if self._state.canReadData(): + # Nothing for sure + + newState = self._findCurrentState() + logger.debug("[STT] New DB state determined: {}".format(newState)) + self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? + self._curState = newState + + def pickTaskType(self): + taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state + weights = [] + for tt in taskTypes: + endState = tt.getEndState() + if endState != None : + weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method + else: + weights.append(10) # read data task, default to 10: TODO: change to a constant + i = self._weighted_choice_sub(weights) + # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + return taskTypes[i] + + def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ + rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? + for i, w in enumerate(weights): + rnd -= w + if rnd < 0: + return i # Manager of the Database Data/Connection -class DbManager(): - +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - - self._state = StateInvalid() # starting state - self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. # self.openDbServerConnection() self._dbConn = DbConn() @@ -710,7 +812,7 @@ class DbManager(): self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected except taos.error.ProgrammingError as err: # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) - if ( err.msg == 'disconnected' ): # cannot open DB connection + if ( err.msg == 'client disconnected' ): # cannot open DB connection print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") sys.exit() else: @@ -721,13 +823,17 @@ class DbManager(): if resetDb : self._dbConn.resetDb() # drop and recreate DB - self._state = self._findCurrentState() + self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape + def getDbConn(self): return self._dbConn - def getState(self): - return self._state + def getStateMachine(self): + return self._stateMachine + + # def getState(self): + # return self._stateMachine.getCurrentState() # We aim to create a starting time tick, such that, whenever we run our test here once # We should be able to safely create 100,000 records, which will not have any repeated time stamp @@ -754,7 +860,8 @@ class DbManager(): tIndex = self.tableNumQueue.push() return tIndex - def getFixedSuperTableName(self): + @classmethod + def getFixedSuperTableName(cls): return "fs_table" def releaseTable(self, i): # return the table back, so others can use it @@ -786,122 +893,6 @@ class DbManager(): def cleanUp(self): self._dbConn.close() - # May be slow, use cautionsly... - def getTaskTypesAtState(self): - allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks - firstTaskTypes = [] - for tc in allTaskClasses: - # t = tc(self) # create task object - if tc.canBeginFrom(self._state): - firstTaskTypes.append(tc) - # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones - taskTypes = firstTaskTypes.copy() # have to have these - for task1 in firstTaskTypes: # each task type gathered so far - endState = task1.getEndState() # figure the end state - if endState == None: - continue - for tc in allTaskClasses: # what task can further begin from there? - if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): - taskTypes.append(tc) # gather it - - if len(taskTypes) <= 0: - raise RuntimeError("No suitable task types found for state: {}".format(self._state)) - logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes)) - return taskTypes - - # tasks.append(ReadFixedDataTask(self)) # always for everybody - # if ( self._state == self.STATE_EMPTY ): - # tasks.append(CreateDbTask(self)) - # tasks.append(CreateFixedTableTask(self)) - # elif ( self._state == self.STATE_DB_ONLY ): - # tasks.append(DropDbTask(self)) - # tasks.append(CreateFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # elif ( self._state == self.STATE_TABLE_ONLY ): - # tasks.append(DropFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - # tasks.append(DropFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # else: - # raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - # return tasks - - def pickTaskType(self): - taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state - weights = [] - for tt in taskTypes: - endState = tt.getEndState() - if endState != None : - weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method - else: - weights.append(10) # read data task, default to 10: TODO: change to a constant - i = self._weighted_choice_sub(weights) - # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) - return taskTypes[i] - - def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ - rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? - for i, w in enumerate(weights): - rnd -= w - if rnd < 0: - return i - - def _findCurrentState(self): - dbc = self._dbConn - ts = time.time() - if dbc.query("show databases") == 0 : # no database?! - # logger.debug("Found EMPTY state") - logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) - return StateEmpty() - dbc.execute("use db") # did not do this when openning connection - if dbc.query("show tables") == 0 : # no tables - # logger.debug("Found DB ONLY state") - logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) - return StateDbOnly() - if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data - # logger.debug("Found TABLE_ONLY state") - logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) - return StateSuperTableOnly() - else: - # logger.debug("Found HAS_DATA state") - logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) - return StateHasData() - - def transition(self, tasks): - if ( len(tasks) == 0 ): # before 1st step, or otherwise empty - return # do nothing - - self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps - - # Generic Checks, first based on the start state - if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, TaskCreateDb) - # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops - - if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, TaskDropDb) - # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop - - # if self._state.canCreateFixedTable(): - # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped - # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create - - # if self._state.canDropFixedTable(): - # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped - # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop - - # if self._state.canAddData(): - # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually - - # if self._state.canReadData(): - # Nothing for sure - - newState = self._findCurrentState() - logger.debug("[STT] New DB state determined: {}".format(newState)) - self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? - self._state = newState - class TaskExecutor(): def __init__(self, curStep): self._curStep = curStep @@ -928,7 +919,7 @@ class Task(): return Task.taskSn def __init__(self, dbManager: DbManager, execStats: ExecutionStats): - self._dbState = dbManager + self._dbManager = dbManager self._workerThread = None self._err = None self._curStep = None @@ -944,7 +935,7 @@ class Task(): return self._err == None def clone(self): # TODO: why do we need this again? - newTask = self.__class__(self._dbState, self._execStats) + newTask = self.__class__(self._dbManager, self._execStats) return newTask def logDebug(self, msg): @@ -980,7 +971,7 @@ class Task(): self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): - return self._dbState.execute(sql) + return self._dbManager.execute(sql) class ExecutionStats: @@ -1047,20 +1038,22 @@ class ExecutionStats: class StateTransitionTask(Task): - # @classmethod - # def getAllTaskClasses(cls): # static - # return cls.__subclasses__() @classmethod def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") + _endState = None + @classmethod + def getEndState(cls): # TODO: optimize by calling it fewer times + raise RuntimeError("Overriding method expected") + # @classmethod # def getBeginStates(cls): # return cls.getInfo()[0] - @classmethod - def getEndState(cls): # returning the class name - return cls.getInfo()[0] + # @classmethod + # def getEndState(cls): # returning the class name + # return cls.getInfo()[0] @classmethod def canBeginFrom(cls, state: AnyState): @@ -1070,15 +1063,10 @@ class StateTransitionTask(Task): def execute(self, wt: WorkerThread): super().execute(wt) - - class TaskCreateDb(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_EMPTY], # can begin from - StateDbOnly() # end state - ] + def getEndState(cls): + return StateDbOnly() @classmethod def canBeginFrom(cls, state: AnyState): @@ -1089,11 +1077,8 @@ class TaskCreateDb(StateTransitionTask): class TaskDropDb(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateEmpty() - ] + def getEndState(cls): + return StateEmpty() @classmethod def canBeginFrom(cls, state: AnyState): @@ -1105,36 +1090,30 @@ class TaskDropDb(StateTransitionTask): class TaskCreateSuperTable(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_DB_ONLY], - StateSuperTableOnly() - ] + def getEndState(cls): + return StateSuperTableOnly() @classmethod def canBeginFrom(cls, state: AnyState): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedSuperTableName() + tblName = self._dbManager.getFixedSuperTableName() wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that automatically class TaskReadData(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - None # meaning doesn't affect state - ] + def getEndState(cls): + return None # meaning doesn't affect state @classmethod def canBeginFrom(cls, state: AnyState): return state.canReadData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - sTbName = self._dbState.getFixedSuperTableName() + sTbName = self._dbManager.getFixedSuperTableName() dbc = wt.getDbConn() dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations @@ -1150,20 +1129,38 @@ class TaskReadData(StateTransitionTask): class TaskDropSuperTable(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateDbOnly() # meaning doesn't affect state - ] + def getEndState(cls): + return StateDbOnly() @classmethod def canBeginFrom(cls, state: AnyState): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedSuperTableName() + tblName = self._dbManager.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) +class TaskAlterTags(StateTransitionTask): + @classmethod + def getEndState(cls): + return None # meaning doesn't affect state + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropFixedSuperTable() # if we can drop it, we can alter tags + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbManager.getFixedSuperTableName() + dice = Dice.throw(4) + if dice == 0 : + wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) + elif dice == 1 : + wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) + elif dice == 2 : + wt.execSql("alter table db.{} drop tag newTag".format(tblName)) + else: # dice == 3 + wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) + class TaskAddData(StateTransitionTask): activeTable : Set[int] = set() # Track which table is being actively worked on LARGE_NUMBER_OF_TABLES = 35 @@ -1186,18 +1183,15 @@ class TaskAddData(StateTransitionTask): cls.fAddLogDone = open("add_log_done.txt", "w") @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateHasData() - ] + def getEndState(cls): + return StateHasData() @classmethod def canBeginFrom(cls, state: AnyState): return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState + ds = self._dbManager wt.execSql("use db") # TODO: seems to be an INSERT bug to require this tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) random.shuffle(tblSeq) From dc92ea0d8fc184d5d6aff8c065ded11932b8b4e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jun 2020 00:13:58 +0800 Subject: [PATCH 16/26] [td-225] update the test sim --- tests/script/general/parser/projection_limit_offset.sim | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/script/general/parser/projection_limit_offset.sim b/tests/script/general/parser/projection_limit_offset.sim index 5f006d0eb7..2b89946ef8 100644 --- a/tests/script/general/parser/projection_limit_offset.sim +++ b/tests/script/general/parser/projection_limit_offset.sim @@ -80,6 +80,7 @@ print $rows sql select ts from group_mt0 where ts>='1970-1-1 8:1:43' and ts<='1970-1-1 8:1:43.500' limit 8000 offset 0; if $rows != 4008 then + print expect 4008, actual:$rows return -1 endi From 4c97427e0cf7e6cdc86f358428de2f42c6dc1b27 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jun 2020 00:15:13 +0800 Subject: [PATCH 17/26] [td-225] fix bugs in parse time --- src/common/src/ttimezone.c | 1 + src/util/src/ttime.c | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/common/src/ttimezone.c b/src/common/src/ttimezone.c index ae6ffea59a..ed62357c4d 100644 --- a/src/common/src/ttimezone.c +++ b/src/common/src/ttimezone.c @@ -20,6 +20,7 @@ #include "tconfig.h" #include "tutil.h" +// TODO refactor to set the tz value through parameter void tsSetTimeZone() { SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone"); uPrint("timezone is set to %s by %s", tsTimezone, tsCfgStatusStr[cfg_timezone->cfgStatus]); diff --git a/src/util/src/ttime.c b/src/util/src/ttime.c index dfec632012..176f9be7fb 100644 --- a/src/util/src/ttime.c +++ b/src/util/src/ttime.c @@ -48,23 +48,21 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0, const unsigned int day, const unsigned int hour, const unsigned int min, const unsigned int sec) { - unsigned int mon = mon0, year = year0; + unsigned int mon = mon0, year = year0; - /* 1..12 -> 11,12,1..10 */ - if (0 >= (int) (mon -= 2)) { - mon += 12; /* Puts Feb last since it has leap day */ - year -= 1; - } + /* 1..12 -> 11,12,1..10 */ + if (0 >= (int) (mon -= 2)) { + mon += 12; /* Puts Feb last since it has leap day */ + year -= 1; + } - //int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) + - // year*365 - 719499)*24 + hour)*60 + min)*60 + sec); - int64_t res; - res = 367*((int64_t)mon)/12; - res += year/4 - year/100 + year/400 + day + year*365 - 719499; + int64_t res = 367*((int64_t)mon)/12; + + res += ((int64_t)(year/4 - year/100 + year/400 + day + year*365) - 719499); // this value may be less than 0 res = res*24; res = ((res + hour) * 60 + min) * 60 + sec; - return (res + timezone); + return (res + timezone); } // ==== mktime() kernel code =================// static int64_t m_deltaUtc = 0; From b5598b6300546e0a3cfde4c5076380ee71a5acd5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jun 2020 00:16:34 +0800 Subject: [PATCH 18/26] [td-225] opt performance for random number generation. --- src/util/src/tskiplist.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 2394adc6f3..f3c0babe6b 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -38,7 +38,7 @@ static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) { const uint32_t factor = 4; int32_t n = 1; - while ((taosRand() % factor) == 0 && n <= pSkipList->maxLevel) { + while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) { n++; } From b4a15d01409ec5683f52a8ff771fc123c657f57c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jun 2020 00:17:43 +0800 Subject: [PATCH 19/26] [td-225] enable the client unit test. --- src/client/CMakeLists.txt | 4 +++- src/client/tests/CMakeLists.txt | 15 +++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 src/client/tests/CMakeLists.txt diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index ac1894369d..00fa1a1479 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -34,7 +34,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) VERSION_INFO) MESSAGE(STATUS "build version ${VERSION_INFO}") SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${VERSION_INFO} SOVERSION 1) - + + ADD_SUBDIRECTORY(tests) + ELSEIF (TD_WINDOWS_64) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32) diff --git a/src/client/tests/CMakeLists.txt b/src/client/tests/CMakeLists.txt new file mode 100644 index 0000000000..f07af85e25 --- /dev/null +++ b/src/client/tests/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +FIND_PATH(HEADER_GTEST_INCLUDE_DIR gtest.h /usr/include/gtest /usr/local/include/gtest) +FIND_LIBRARY(LIB_GTEST_STATIC_DIR libgtest.a /usr/lib/ /usr/local/lib) + +IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR) + MESSAGE(STATUS "gTest library found, build unit test") + + INCLUDE_DIRECTORIES(${HEADER_GTEST_INCLUDE_DIR}) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + + ADD_EXECUTABLE(cliTest ${SOURCE_LIST}) + TARGET_LINK_LIBRARIES(cliTest taos tutil common gtest pthread) +ENDIF() \ No newline at end of file From 24a0210d51a108ce83cfae90791871f3a996f3b5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jun 2020 00:19:27 +0800 Subject: [PATCH 20/26] [td-225] add log for tsdb --- src/tsdb/src/tsdbMeta.c | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 6b0224cad2..e8da25d585 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -553,10 +553,18 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { return 0; } -void tsdbRefTable(STable *pTable) { T_REF_INC(pTable); } +void tsdbRefTable(STable *pTable) { + int16_t ref = T_REF_INC(pTable); + tsdbTrace("ref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref); +} void tsdbUnRefTable(STable *pTable) { - if (T_REF_DEC(pTable) == 0) { + int16_t ref = T_REF_DEC(pTable); + tsdbTrace("unref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref); + + if (ref == 0) { + tsdbTrace("destroy table:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid); + if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) { tsdbUnRefTable(pTable->pSuper); } From 5835964c15b4ee10f89a8fa8d81283f4c864d4c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 27 Jun 2020 01:33:47 +0800 Subject: [PATCH 21/26] [td-225] --- src/query/src/qExecutor.c | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ed529db92e..cdb56e1469 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -822,7 +822,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas } /** - * + * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv * @param forwardStep * @param tsCols @@ -1064,16 +1064,18 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; - - TSKEY *tsCols = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; - bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); + + SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); + + TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; + bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); int16_t type = 0; int16_t bytes = 0; char *groupbyColumnData = NULL; - if (groupbyStateValue) { + if (groupbyColumnValue) { groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); } @@ -1161,7 +1163,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS pWindowResInfo->curIndex = index; } else { // other queries // decide which group this rows belongs to according to current state value - if (groupbyStateValue) { + if (groupbyColumnValue) { char *val = groupbyColumnData + bytes * offset; int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes); @@ -1186,9 +1188,14 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } } - - item->lastKey = tsCols[offset] + step; - + + assert(offset >= 0); + if (tsCols != NULL) { + item->lastKey = tsCols[offset] + step; + } else { + item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step; + } + // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { From f5faa8061ed5203a898bc7ccf3e6100578cb145c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 27 Jun 2020 03:00:44 +0000 Subject: [PATCH 22/26] fix crash generate by crash_gen.sh -p -t 5 -s 100 --- src/mnode/src/mnodeDb.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 5ec5aebf14..81b703b740 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -968,6 +968,17 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { return mnodeAlterDb(pMsg->pDb, pAlter, pMsg); } +static int32_t mnodeDropDbCb(SMnodeMsg *pMsg, int32_t code) { + SDbObj *pDb = pMsg->pDb; + if (code != TSDB_CODE_SUCCESS) { + mError("db:%s, failed to drop from sdb, reason:%s", pDb->name, tstrerror(code)); + } else { + mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + } + + return code; +} + static int32_t mnodeDropDb(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; @@ -978,12 +989,12 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, .pObj = pDb, - .pMsg = pMsg + .pMsg = pMsg, + .cb = mnodeDropDbCb }; int32_t code = sdbDeleteRow(&oper); if (code == TSDB_CODE_SUCCESS) { - mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } From cc79eedd93dfa39e2f54863f4ab7acc271f9a0f7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 27 Jun 2020 03:14:37 +0000 Subject: [PATCH 23/26] invalid write while drop stable --- src/mnode/src/mnodeTable.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4906aeaeb0..88ed0e90eb 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -854,13 +854,15 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) { SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; - if (pTable != NULL) { - mLPrint("app:%p:%p, stable:%s, is dropped from sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, - tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { + mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); + } else { + mLPrint("app:%p:%p, stable:%s, is dropped from sdb", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); } return code; } + static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; @@ -899,12 +901,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { }; int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); - return code; - } else { + if (code == TSDB_CODE_SUCCESS) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } + return code; } static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { From 1be46ac76f8ca5c33e620ecd8e3df41e3b292f1d Mon Sep 17 00:00:00 2001 From: Steven Li Date: Fri, 26 Jun 2020 21:18:34 -0700 Subject: [PATCH 24/26] Now able to run without parameter, also refactored to run service binary --- tests/pytest/crash_gen.py | 137 +++++++++++++++++++++----------------- 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index cc41fd5e7d..916e8904ff 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -686,7 +686,8 @@ class StateHasData(AnyState): self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) else: # should be STATE_HAS_DATA - self.assertNoTask(tasks, TaskDropDb) + if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one + self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) @@ -1295,7 +1296,67 @@ class LoggingFilter(logging.Filter): # return False return True +class MainExec: + @classmethod + def runClient(cls): + # resetDb = False # DEBUG only + # dbState = DbState(resetDb) # DBEUG only! + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + tc = ThreadCoordinator(thPool, dbManager) + tc.run() + tc.logStats() + dbManager.cleanUp() + + @classmethod + def runService(cls): + print("Running service...") + + @classmethod + def runTemp(cls): # for debugging purposes + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1308,24 +1369,27 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) + parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') + parser.add_argument('-e', '--run-tdengine', action='store_true', + help='Run TDengine service in foreground (default: false)') parser.add_argument('-l', '--larger-data', action='store_true', help='Write larger amount of data during write operations (default: false)') - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', + parser.add_argument('-p', '--per-thread-db-connection', action='store_false', help='Use a single shared db connection (default: false)') parser.add_argument('-r', '--record-ops', action='store_true', help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - if len(sys.argv) == 1: - parser.print_help() - sys.exit() + # if len(sys.argv) == 1: + # parser.print_help() + # sys.exit() global logger logger = logging.getLogger('CrashGen') @@ -1337,62 +1401,11 @@ def main(): ch = logging.StreamHandler() logger.addHandler(ch) - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbManager = DbManager() # Regular function - Dice.seed(0) # initial seeding of dice - tc = ThreadCoordinator( - ThreadPool(gConfig.num_threads, gConfig.max_steps), - # WorkDispatcher(dbState), # Obsolete? - dbManager - ) + if gConfig.run_tdengine : # run server + MainExec.runService() + else : + MainExec.runClient() - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - - - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - - tc.run() - tc.logStats() - dbManager.cleanUp() # logger.info("Crash_Gen execution finished") From 88e2c1e18af1ec4567f8c36499f4929aec2280de Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 27 Jun 2020 08:31:24 +0000 Subject: [PATCH 25/26] memory leak if vnode not exist --- src/dnode/src/dnodeVRead.c | 1 + src/dnode/src/dnodeVWrite.c | 1 + src/rpc/src/rpcMain.c | 2 ++ src/rpc/src/rpcTcp.c | 2 ++ src/rpc/src/rpcUdp.c | 2 ++ 5 files changed, 8 insertions(+) diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index cd18ae6dda..3027a94411 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -131,6 +131,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { .msgType = 0 }; rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); } } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index e61364355d..e2cc2d1cd3 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -119,6 +119,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { .msgType = 0 }; rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); } } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index e9ddd89467..5f30d27aeb 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -331,6 +331,7 @@ void rpcFreeCont(void *cont) { if ( cont ) { char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); free(temp); + // tTrace("free mem: %p", temp); } } @@ -540,6 +541,7 @@ static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); + // tTrace("free mem: %p", temp); } } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 82168f0b0e..c21a1e04df 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -418,6 +418,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { if ( NULL == buffer) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; + } else { + // tTrace("malloc mem: %p", buffer); } msg = buffer + tsRpcOverhead; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index a4c7d6c145..e92168c46a 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -211,6 +211,8 @@ static void *taosRecvUdpData(void *param) { if (NULL == tmsg) { tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen); continue; + } else { + // tTrace("malloc mem: %p", tmsg); } tmsg += tsRpcOverhead; // overhead for SRpcReqContext From e7f5185b85b33b446a20dfb342128547ff34d2bb Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 27 Jun 2020 13:18:06 +0000 Subject: [PATCH 26/26] memory leak for redirection case --- src/dnode/src/dnodeMPeer.c | 1 + src/dnode/src/dnodeMRead.c | 1 + src/dnode/src/dnodeMWrite.c | 1 + src/dnode/src/dnodeShell.c | 6 +++--- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index c37b2bed79..92fb1bd417 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -113,6 +113,7 @@ void dnodeFreeMnodePqueue() { void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMPeerQueue == NULL) { dnodeSendRedirectMsg(pMsg, false); + rpcFreeCont(pMsg->pCont); return; } diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index cb02ffbb1d..430f0daa59 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -116,6 +116,7 @@ void dnodeFreeMnodeRqueue() { void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMReadQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); + rpcFreeCont(pMsg->pCont); return; } diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 47645ea5ea..35657a6e45 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -115,6 +115,7 @@ void dnodeFreeMnodeWqueue() { void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); + rpcFreeCont(pMsg->pCont); return; } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 4252e63f8d..1e0f1a6415 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -38,9 +38,9 @@ static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue; // the following message shall be treated as mnode write