diff --git a/src/client/CMakeLists.txt b/src/client/CMakeLists.txt index ac1894369d..00fa1a1479 100644 --- a/src/client/CMakeLists.txt +++ b/src/client/CMakeLists.txt @@ -34,7 +34,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) VERSION_INFO) MESSAGE(STATUS "build version ${VERSION_INFO}") SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${VERSION_INFO} SOVERSION 1) - + + ADD_SUBDIRECTORY(tests) + ELSEIF (TD_WINDOWS_64) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index df4ccca9bc..1db4108d22 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1351,6 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) { int32_t code = TSDB_CODE_SUCCESS; SSqlCmd *pCmd = &pSql->cmd; + pSql->res.numOfRows = 0; assert(pCmd->numOfClause == 1); STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta; @@ -1394,6 +1395,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { fclose(fp); pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); return; } @@ -1458,8 +1460,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) { free(line); if (count > 0) { - if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) { + code = doPackSendDataBlock(pSql, count, pTableDataBlock); + if (code != TSDB_CODE_SUCCESS) { pParentSql->res.code = code; + tscQueueAsyncRes(pParentSql); + return; } } else { diff --git a/src/client/tests/CMakeLists.txt b/src/client/tests/CMakeLists.txt new file mode 100644 index 0000000000..f07af85e25 --- /dev/null +++ b/src/client/tests/CMakeLists.txt @@ -0,0 +1,15 @@ +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) +PROJECT(TDengine) + +FIND_PATH(HEADER_GTEST_INCLUDE_DIR gtest.h /usr/include/gtest /usr/local/include/gtest) +FIND_LIBRARY(LIB_GTEST_STATIC_DIR libgtest.a /usr/lib/ /usr/local/lib) + +IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR) + MESSAGE(STATUS "gTest library found, build unit test") + + INCLUDE_DIRECTORIES(${HEADER_GTEST_INCLUDE_DIR}) + AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + + ADD_EXECUTABLE(cliTest ${SOURCE_LIST}) + TARGET_LINK_LIBRARIES(cliTest taos tutil common gtest pthread) +ENDIF() \ No newline at end of file diff --git a/src/common/src/ttimezone.c b/src/common/src/ttimezone.c index ae6ffea59a..ed62357c4d 100644 --- a/src/common/src/ttimezone.c +++ b/src/common/src/ttimezone.c @@ -20,6 +20,7 @@ #include "tconfig.h" #include "tutil.h" +// TODO refactor to set the tz value through parameter void tsSetTimeZone() { SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone"); uPrint("timezone is set to %s by %s", tsTimezone, tsCfgStatusStr[cfg_timezone->cfgStatus]); diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index c37b2bed79..92fb1bd417 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -113,6 +113,7 @@ void dnodeFreeMnodePqueue() { void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMPeerQueue == NULL) { dnodeSendRedirectMsg(pMsg, false); + rpcFreeCont(pMsg->pCont); return; } diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index cb02ffbb1d..430f0daa59 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -116,6 +116,7 @@ void dnodeFreeMnodeRqueue() { void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMReadQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); + rpcFreeCont(pMsg->pCont); return; } diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 47645ea5ea..35657a6e45 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -115,6 +115,7 @@ void dnodeFreeMnodeWqueue() { void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) { dnodeSendRedirectMsg(pMsg, true); + rpcFreeCont(pMsg->pCont); return; } diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 4252e63f8d..1e0f1a6415 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -38,9 +38,9 @@ static int32_t tsDnodeQueryReqNum = 0; static int32_t tsDnodeSubmitReqNum = 0; int32_t dnodeInitShell() { - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue; // the following message shall be treated as mnode write diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index eefd9f0c00..ecf78edfd5 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -354,7 +354,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_DEFAULT_DBS_HASH_SIZE 100 #define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100 #define TSDB_DEFAULT_STABLES_HASH_SIZE 100 -#define TSDB_DEFAULT_CTABLES_HASH_SIZE 10000 +#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000 #define TSDB_PORT_DNODESHELL 0 #define TSDB_PORT_DNODEDNODE 5 diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index e673277a0b..899bed3b46 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -499,7 +499,7 @@ int main(int argc, char *argv[]) { /* Create all the tables; */ printf("Creating %d table(s)......\n", ntables); for (int i = 0; i < ntables; i++) { - snprintf(command, BUFFER_SIZE, "create table %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols); queryDB(taos, command); } @@ -509,7 +509,7 @@ int main(int argc, char *argv[]) { } else { /* Create metric table */ printf("Creating meters super table...\n"); - snprintf(command, BUFFER_SIZE, "create table %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols); queryDB(taos, command); printf("meters created!\n"); @@ -523,9 +523,9 @@ int main(int argc, char *argv[]) { j = i % 10; } if (j % 2 == 0) { - snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai"); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai"); } else { - snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing"); + snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing"); } queryDB(taos, command); } @@ -847,10 +847,10 @@ void *syncWrite(void *sarg) { pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, tID); int k; for (k = 0; k < winfo->nrecords_per_request;) { - int rand_num = trand() % 100; + int rand_num = rand() % 100; int len = -1; if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) { - long d = tmp_time - trand() % 1000000 + rand_num; + long d = tmp_time - rand() % 1000000 + rand_num; len = generateData(data, data_type, ncols_per_record, d, len_of_binary); } else { len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary); @@ -942,10 +942,10 @@ void callBack(void *param, TAOS_RES *res, int code) { pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name); for (int i = 0; i < tb_info->nrecords_per_request; i++) { - int rand_num = trand() % 100; + int rand_num = rand() % 100; if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate) { - long d = tmp_time - trand() % 1000000 + rand_num; + long d = tmp_time - rand() % 1000000 + rand_num; generateData(data, datatype, ncols_per_record, d, len_of_binary); } else { @@ -994,20 +994,20 @@ int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t times for (int i = 0; i < num_of_cols; i++) { if (strcasecmp(data_type[i % c], "tinyint") == 0) { - pstr += sprintf(pstr, ", %d", (int)(trand() % 128)); + pstr += sprintf(pstr, ", %d", (int)(rand() % 128)); } else if (strcasecmp(data_type[i % c], "smallint") == 0) { - pstr += sprintf(pstr, ", %d", (int)(trand() % 32767)); + pstr += sprintf(pstr, ", %d", (int)(rand() % 32767)); } else if (strcasecmp(data_type[i % c], "int") == 0) { - pstr += sprintf(pstr, ", %d", (int)(trand() % 10)); + pstr += sprintf(pstr, ", %d", (int)(rand() % 10)); } else if (strcasecmp(data_type[i % c], "bigint") == 0) { - pstr += sprintf(pstr, ", %" PRId64, trand() % 2147483648); + pstr += sprintf(pstr, ", %" PRId64, rand() % 2147483648); } else if (strcasecmp(data_type[i % c], "float") == 0) { - pstr += sprintf(pstr, ", %10.4f", (float)(trand() / 1000.0)); + pstr += sprintf(pstr, ", %10.4f", (float)(rand() / 1000.0)); } else if (strcasecmp(data_type[i % c], "double") == 0) { - double t = (double)(trand() / 1000000.0); + double t = (double)(rand() / 1000000.0); pstr += sprintf(pstr, ", %20.8f", t); } else if (strcasecmp(data_type[i % c], "bool") == 0) { - bool b = trand() & 1; + bool b = rand() & 1; pstr += sprintf(pstr, ", %s", b ? "true" : "false"); } else if (strcasecmp(data_type[i % c], "binary") == 0) { char s[len_of_binary]; @@ -1033,7 +1033,7 @@ void rand_string(char *str, int size) { --size; int n; for (n = 0; n < size; n++) { - int key = trand() % (int)(sizeof charset - 1); + int key = rand() % (int)(sizeof charset - 1); str[n] = charset[key]; } str[n] = 0; diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 5ec5aebf14..81b703b740 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -968,6 +968,17 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) { return mnodeAlterDb(pMsg->pDb, pAlter, pMsg); } +static int32_t mnodeDropDbCb(SMnodeMsg *pMsg, int32_t code) { + SDbObj *pDb = pMsg->pDb; + if (code != TSDB_CODE_SUCCESS) { + mError("db:%s, failed to drop from sdb, reason:%s", pDb->name, tstrerror(code)); + } else { + mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); + } + + return code; +} + static int32_t mnodeDropDb(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; @@ -978,12 +989,12 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { .type = SDB_OPER_GLOBAL, .table = tsDbSdb, .pObj = pDb, - .pMsg = pMsg + .pMsg = pMsg, + .cb = mnodeDropDbCb }; int32_t code = sdbDeleteRow(&oper); if (code == TSDB_CODE_SUCCESS) { - mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index b3dae1c5d4..a9d450aff0 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -250,7 +250,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code)); } -static int32_t sdbForwardToPeer(SWalHead *pHead) { + static int32_t sdbForwardToPeer(SWalHead *pHead) { if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS; int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC); @@ -782,7 +782,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { pTable->restoredFp = pDesc->restoredFp; _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); - if (pTable->keyType == SDB_KEY_STRING) { + if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) { hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); } pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true); diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 4906aeaeb0..88ed0e90eb 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -854,13 +854,15 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) { SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable; - if (pTable != NULL) { - mLPrint("app:%p:%p, stable:%s, is dropped from sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId, - tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { + mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); + } else { + mLPrint("app:%p:%p, stable:%s, is dropped from sdb", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId); } return code; } + static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; @@ -899,12 +901,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { }; int32_t code = sdbDeleteRow(&oper); - if (code != TSDB_CODE_SUCCESS) { - mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId); - return code; - } else { + if (code == TSDB_CODE_SUCCESS) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } + return code; } static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d41bac2a49..cdb56e1469 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -767,6 +767,9 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) { static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size, SArray *pDataBlock) { + if (pDataBlock == NULL) { + return NULL; + } char *dataBlock = NULL; SQuery *pQuery = pRuntimeEnv->pQuery; @@ -819,7 +822,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas } /** - * + * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv * @param forwardStep * @param tsCols @@ -854,6 +857,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) { + tfree(sasArray); return; } @@ -1060,16 +1064,18 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo* item = pQuery->current; - - TSKEY *tsCols = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData; - bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); + + SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0); + + TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL; + bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr); SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport)); int16_t type = 0; int16_t bytes = 0; char *groupbyColumnData = NULL; - if (groupbyStateValue) { + if (groupbyColumnValue) { groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock); } @@ -1157,7 +1163,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS pWindowResInfo->curIndex = index; } else { // other queries // decide which group this rows belongs to according to current state value - if (groupbyStateValue) { + if (groupbyColumnValue) { char *val = groupbyColumnData + bytes * offset; int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes); @@ -1182,9 +1188,14 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } } - - item->lastKey = tsCols[offset] + step; - + + assert(offset >= 0); + if (tsCols != NULL) { + item->lastKey = tsCols[offset] + step; + } else { + item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step; + } + // todo refactor: extract method for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) { @@ -1349,10 +1360,13 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) { // the column may be the normal column, group by normal_column, the functionId is TSDB_FUNC_PRJ } } - - p->tagInfo.pTagCtxList = pTagCtx; - p->tagInfo.numOfTagCols = num; - p->tagInfo.tagsLen = tagLen; + if (p != NULL) { + p->tagInfo.pTagCtxList = pTagCtx; + p->tagInfo.numOfTagCols = num; + p->tagInfo.tagsLen = tagLen; + } else { + tfree(pTagCtx); + } } } @@ -3497,7 +3511,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde continue; } - assert(result[i].numOfRows >= 0 && pQInfo->offset <= 1); + assert(pQInfo->offset <= 1); int32_t numOfRowsToCopy = result[i].numOfRows - pQInfo->offset; int32_t oldOffset = pQInfo->offset; @@ -5295,9 +5309,9 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * bytes = s.bytes; } else{ int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols); - assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags || j == TSDB_TBNAME_COLUMN_INDEX); + assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags); - if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) { + if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) { SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j]; type = pCol->type; bytes = pCol->bytes; @@ -5339,8 +5353,6 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo * assert(ret == TSDB_CODE_SUCCESS); } } - - tfree(pExprMsg); *pExprInfo = pExprs; return TSDB_CODE_SUCCESS; @@ -5591,11 +5603,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList, pQInfo->signature = pQInfo; pQInfo->tableGroupInfo = *pTableGroupInfo; - size_t numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); + size_t numOfGroups = 0; + if (pTableGroupInfo->pGroupList != NULL) { + numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList); + + pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); + pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; + } - pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); - pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; - int tableIndex = 0; STimeWindow window = pQueryMsg->window; taosArraySort(pTableIdList, compareTableIdInfo); @@ -5693,7 +5708,8 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder); tsBufResetPos(pTSBuf); - tsBufNextPos(pTSBuf); + bool ret = tsBufNextPos(pTSBuf); + UNUSED(ret); } // only the successful complete requries the sem_post/over = 1 operations. @@ -5839,18 +5855,23 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // make sure file exist if (FD_VALID(fd)) { - size_t s = lseek(fd, 0, SEEK_END); - qTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s); - - lseek(fd, 0, SEEK_SET); - read(fd, data, s); + int32_t s = lseek(fd, 0, SEEK_END); + UNUSED(s); + qTrace("QInfo:%p ts comp data return, file:%s, size:%d", pQInfo, pQuery->sdata[0]->data, s); + s = lseek(fd, 0, SEEK_SET); + if (s >= 0) { + size_t sz = read(fd, data, s); + UNUSED(sz); + } close(fd); - unlink(pQuery->sdata[0]->data); } else { - // todo return the error code to client + // todo return the error code to client and handle invalid fd qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo, pQuery->sdata[0]->data, strerror(errno)); + if (fd != -1) { + close(fd); + } } // all data returned, set query over @@ -5903,7 +5924,6 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi } if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) { - free(pExprMsg); goto _over; } @@ -5975,6 +5995,7 @@ _over: } free(pTagColumnInfo); free(pExprs); + free(pExprMsg); taosArrayDestroy(pTableIdList); //pQInfo already freed in initQInfo, but *pQInfo may not pointer to null; diff --git a/src/query/src/qpercentile.c b/src/query/src/qpercentile.c index e192cf3873..9de4d3668c 100644 --- a/src/query/src/qpercentile.c +++ b/src/query/src/qpercentile.c @@ -880,8 +880,11 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - UNUSED(sz); - tMemBucketPut(pMemBucket, pPage->data, pPage->num); + if (sz != pMemBuffer->pageSize) { + uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path); + } else { + tMemBucketPut(pMemBucket, pPage->data, pPage->num); + } } fclose(pMemBuffer->file); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index e9ddd89467..5f30d27aeb 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -331,6 +331,7 @@ void rpcFreeCont(void *cont) { if ( cont ) { char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); free(temp); + // tTrace("free mem: %p", temp); } } @@ -540,6 +541,7 @@ static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); + // tTrace("free mem: %p", temp); } } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 82168f0b0e..c21a1e04df 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -418,6 +418,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { if ( NULL == buffer) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; + } else { + // tTrace("malloc mem: %p", buffer); } msg = buffer + tsRpcOverhead; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index a4c7d6c145..e92168c46a 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -211,6 +211,8 @@ static void *taosRecvUdpData(void *param) { if (NULL == tmsg) { tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen); continue; + } else { + // tTrace("malloc mem: %p", tmsg); } tmsg += tsRpcOverhead; // overhead for SRpcReqContext diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 6b0224cad2..e8da25d585 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -553,10 +553,18 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { return 0; } -void tsdbRefTable(STable *pTable) { T_REF_INC(pTable); } +void tsdbRefTable(STable *pTable) { + int16_t ref = T_REF_INC(pTable); + tsdbTrace("ref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref); +} void tsdbUnRefTable(STable *pTable) { - if (T_REF_DEC(pTable) == 0) { + int16_t ref = T_REF_DEC(pTable); + tsdbTrace("unref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref); + + if (ref == 0) { + tsdbTrace("destroy table:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid); + if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) { tsdbUnRefTable(pTable->pSuper); } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index 2394adc6f3..f3c0babe6b 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -38,7 +38,7 @@ static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) { const uint32_t factor = 4; int32_t n = 1; - while ((taosRand() % factor) == 0 && n <= pSkipList->maxLevel) { + while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) { n++; } diff --git a/src/util/src/ttime.c b/src/util/src/ttime.c index dfec632012..176f9be7fb 100644 --- a/src/util/src/ttime.c +++ b/src/util/src/ttime.c @@ -48,23 +48,21 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0, const unsigned int day, const unsigned int hour, const unsigned int min, const unsigned int sec) { - unsigned int mon = mon0, year = year0; + unsigned int mon = mon0, year = year0; - /* 1..12 -> 11,12,1..10 */ - if (0 >= (int) (mon -= 2)) { - mon += 12; /* Puts Feb last since it has leap day */ - year -= 1; - } + /* 1..12 -> 11,12,1..10 */ + if (0 >= (int) (mon -= 2)) { + mon += 12; /* Puts Feb last since it has leap day */ + year -= 1; + } - //int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) + - // year*365 - 719499)*24 + hour)*60 + min)*60 + sec); - int64_t res; - res = 367*((int64_t)mon)/12; - res += year/4 - year/100 + year/400 + day + year*365 - 719499; + int64_t res = 367*((int64_t)mon)/12; + + res += ((int64_t)(year/4 - year/100 + year/400 + day + year*365) - 719499); // this value may be less than 0 res = res*24; res = ((res + hour) * 60 + min) * 60 + sec; - return (res + timezone); + return (res + timezone); } // ==== mktime() kernel code =================// static int64_t m_deltaUtc = 0; diff --git a/tests/pytest/crash_gen.py b/tests/pytest/crash_gen.py index c88683aa09..916e8904ff 100755 --- a/tests/pytest/crash_gen.py +++ b/tests/pytest/crash_gen.py @@ -14,6 +14,7 @@ from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel import sys +import os import traceback # Require Python 3 if sys.version_info[0] < 3: @@ -32,6 +33,7 @@ import textwrap from typing import List from typing import Dict +from typing import Set from util.log import * from util.dnodes import * @@ -42,7 +44,10 @@ import crash_gen import taos # Global variables, tried to keep a small number. -gConfig = None # Command-line/Environment Configurations, will set a bit later + +# Command-line/Environment Configurations, will set a bit later +# ConfigNameSpace = argparse.Namespace +gConfig = argparse.Namespace() # Dummy value, will be replaced later logger = None def runThread(wt: WorkerThread): @@ -64,7 +69,7 @@ class WorkerThread: # self._curStep = -1 self._pool = pool self._tid = tid - self._tc = tc + self._tc = tc # type: ThreadCoordinator # self.threadIdent = threading.get_ident() self._thread = threading.Thread(target=runThread, args=(self,)) self._stepGate = threading.Event() @@ -156,13 +161,13 @@ class WorkerThread: if ( gConfig.per_thread_db_connection ): return self._dbConn.execute(sql) else: - return self._tc.getDbState().getDbConn().execute(sql) + return self._tc.getDbManager().getDbConn().execute(sql) def getDbConn(self): if ( gConfig.per_thread_db_connection ): return self._dbConn else: - return self._tc.getDbState().getDbConn() + return self._tc.getDbManager().getDbConn() # def querySql(self, sql): # not "execute", since we are out side the DB context # if ( gConfig.per_thread_db_connection ): @@ -171,12 +176,12 @@ class WorkerThread: # return self._tc.getDbState().getDbConn().query(sql) class ThreadCoordinator: - def __init__(self, pool, dbState): + def __init__(self, pool, dbManager): self._curStep = -1 # first step is 0 self._pool = pool # self._wd = wd self._te = None # prepare for every new step - self._dbState = dbState + self._dbManager = dbManager self._executedTasks: List[Task] = [] # in a given step self._lock = threading.RLock() # sync access for a few things @@ -186,8 +191,8 @@ class ThreadCoordinator: def getTaskExecutor(self): return self._te - def getDbState(self) -> DbState : - return self._dbState + def getDbManager(self) -> DbManager : + return self._dbManager def crossStepBarrier(self): self._stepBarrier.wait() @@ -211,7 +216,7 @@ class ThreadCoordinator: # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate" try: - self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state + self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state except taos.error.ProgrammingError as err: if ( err.msg == 'network unavailable' ): # broken DB connection logger.info("DB connection broken, execution failed") @@ -284,8 +289,8 @@ class ThreadCoordinator: # logger.debug(" (dice:{}/{}) ".format(i, nTasks)) # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc. # return tasks[i].clone() # TODO: still necessary? - taskType = self.getDbState().pickTaskType() # pick a task type for current state - return taskType(self.getDbState(), self._execStats) # create a task from it + taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state + return taskType(self.getDbManager(), self._execStats) # create a task from it def resetExecutedTasks(self): self._executedTasks = [] # should be under single thread @@ -296,16 +301,12 @@ class ThreadCoordinator: # We define a class to run a number of threads in locking steps. class ThreadPool: - def __init__(self, dbState, numThreads, maxSteps, funcSequencer): + def __init__(self, numThreads, maxSteps): self.numThreads = numThreads self.maxSteps = maxSteps - self.funcSequencer = funcSequencer # Internal class variables - # self.dispatcher = WorkDispatcher(dbState) # Obsolete? self.curStep = 0 self.threadList = [] - # self.stepGate = threading.Condition() # Gate to hold/sync all threads - # self.numWaitingThreads = 0 # starting to run all the threads, in locking steps def createAndStartThreads(self, tc: ThreadCoordinator): @@ -319,7 +320,8 @@ class ThreadPool: logger.debug("Joining thread...") workerThread._thread.join() -# A queue of continguous POSITIVE integers +# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers +# for new table names class LinearQueue(): def __init__(self): self.firstIndex = 1 # 1st ever element @@ -595,9 +597,9 @@ class StateEmpty(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB - if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks - self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers + if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB + if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks + self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers class StateDbOnly(AnyState): def getInfo(self): @@ -609,20 +611,20 @@ class StateDbOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( not self.hasTask(tasks, CreateDbTask) ): - self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more - self.assertIfExistThenSuccess(tasks, DropDbTask) + if ( not self.hasTask(tasks, TaskCreateDb) ): + self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more + self.assertIfExistThenSuccess(tasks, TaskDropDb) # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases # Nothing to be said about adding data task - if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB + # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess - self.assertAtMostOneSuccess(tasks, DropDbTask) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # self._state = self.STATE_EMPTY - elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success + if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table - if ( not self.hasTask(tasks, DropFixedSuperTableTask) ): - self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything - self.assertNoTask(tasks, DropDbTask) # should have have tried + if ( not self.hasTask(tasks, TaskDropSuperTable) ): + self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything + # self.assertNoTask(tasks, DropDbTask) # should have have tried # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet # # can't say there's add-data attempts, since they may all fail # self._state = self.STATE_TABLE_ONLY @@ -645,8 +647,8 @@ class StateSuperTableOnly(AnyState): ] def verifyTasksToState(self, tasks, newState): - if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table - self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) + if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table + self.assertAtMostOneSuccess(tasks, TaskDropSuperTable) # self._state = self.STATE_DB_ONLY # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data # self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases @@ -670,35 +672,140 @@ class StateHasData(AnyState): def verifyTasksToState(self, tasks, newState): if ( newState.equals(AnyState.STATE_EMPTY) ): - self.hasSuccess(tasks, DropDbTask) - self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy + self.hasSuccess(tasks, TaskDropDb) + if ( not self.hasTask(tasks, TaskCreateDb) ) : + self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only - if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task - self.assertNoTask(tasks, DropDbTask) # we must have drop_db task - self.hasSuccess(tasks, DropFixedSuperTableTask) + if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task + self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task + self.hasSuccess(tasks, TaskDropSuperTable) # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertNoTask(tasks, AddFixedDataTask) + self.assertNoTask(tasks, TaskDropDb) + self.assertNoTask(tasks, TaskDropSuperTable) + self.assertNoTask(tasks, TaskAddData) # self.hasSuccess(tasks, DeleteDataTasks) - else: - self.assertNoTask(tasks, DropDbTask) - self.assertNoTask(tasks, DropFixedSuperTableTask) - self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) + else: # should be STATE_HAS_DATA + if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one + self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it + if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table + self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it + # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask) +class StateMechine : + def __init__(self, dbConn): + self._dbConn = dbConn + self._curState = self._findCurrentState() # starting state + self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. + + def getCurrentState(self): + return self._curState -# State of the database as we believe it to be -class DbState(): - + # May be slow, use cautionsly... + def getTaskTypes(self): # those that can run (directly/indirectly) from the current state + allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks + firstTaskTypes = [] + for tc in allTaskClasses: + # t = tc(self) # create task object + if tc.canBeginFrom(self._curState): + firstTaskTypes.append(tc) + # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones + taskTypes = firstTaskTypes.copy() # have to have these + for task1 in firstTaskTypes: # each task type gathered so far + endState = task1.getEndState() # figure the end state + if endState == None: # does not change end state + continue # no use, do nothing + for tc in allTaskClasses: # what task can further begin from there? + if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): + taskTypes.append(tc) # gather it + + if len(taskTypes) <= 0: + raise RuntimeError("No suitable task types found for state: {}".format(self._curState)) + logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes)) + return taskTypes + + def _findCurrentState(self): + dbc = self._dbConn + ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state + if dbc.query("show databases") == 0 : # no database?! + # logger.debug("Found EMPTY state") + logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) + return StateEmpty() + dbc.execute("use db") # did not do this when openning connection + if dbc.query("show tables") == 0 : # no tables + # logger.debug("Found DB ONLY state") + logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) + return StateDbOnly() + if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables + # logger.debug("Found TABLE_ONLY state") + logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) + return StateSuperTableOnly() + else: # has actual tables + # logger.debug("Found HAS_DATA state") + logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) + return StateHasData() + + def transition(self, tasks): + if ( len(tasks) == 0 ): # before 1st step, or otherwise empty + return # do nothing + + self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps + + # Generic Checks, first based on the start state + if self._curState.canCreateDb(): + self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb) + # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops + + if self._curState.canDropDb(): + self._curState.assertIfExistThenSuccess(tasks, TaskDropDb) + # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop + + # if self._state.canCreateFixedTable(): + # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped + # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create + + # if self._state.canDropFixedTable(): + # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped + # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop + + # if self._state.canAddData(): + # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually + + # if self._state.canReadData(): + # Nothing for sure + + newState = self._findCurrentState() + logger.debug("[STT] New DB state determined: {}".format(newState)) + self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? + self._curState = newState + + def pickTaskType(self): + taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state + weights = [] + for tt in taskTypes: + endState = tt.getEndState() + if endState != None : + weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method + else: + weights.append(10) # read data task, default to 10: TODO: change to a constant + i = self._weighted_choice_sub(weights) + # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) + return taskTypes[i] + + def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ + rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? + for i, w in enumerate(weights): + rnd -= w + if rnd < 0: + return i + +# Manager of the Database Data/Connection +class DbManager(): def __init__(self, resetDb = True): self.tableNumQueue = LinearQueue() self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick self._lastInt = 0 # next one is initial integer self._lock = threading.RLock() - - self._state = StateInvalid() # starting state - self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc. # self.openDbServerConnection() self._dbConn = DbConn() @@ -706,7 +813,7 @@ class DbState(): self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected except taos.error.ProgrammingError as err: # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err)) - if ( err.msg == 'disconnected' ): # cannot open DB connection + if ( err.msg == 'client disconnected' ): # cannot open DB connection print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.") sys.exit() else: @@ -717,13 +824,17 @@ class DbState(): if resetDb : self._dbConn.resetDb() # drop and recreate DB - self._state = self._findCurrentState() + self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape + def getDbConn(self): return self._dbConn - def getState(self): - return self._state + def getStateMachine(self): + return self._stateMachine + + # def getState(self): + # return self._stateMachine.getCurrentState() # We aim to create a starting time tick, such that, whenever we run our test here once # We should be able to safely create 100,000 records, which will not have any repeated time stamp @@ -750,7 +861,8 @@ class DbState(): tIndex = self.tableNumQueue.push() return tIndex - def getFixedSuperTableName(self): + @classmethod + def getFixedSuperTableName(cls): return "fs_table" def releaseTable(self, i): # return the table back, so others can use it @@ -782,122 +894,6 @@ class DbState(): def cleanUp(self): self._dbConn.close() - # May be slow, use cautionsly... - def getTaskTypesAtState(self): - allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks - firstTaskTypes = [] - for tc in allTaskClasses: - # t = tc(self) # create task object - if tc.canBeginFrom(self._state): - firstTaskTypes.append(tc) - # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones - taskTypes = firstTaskTypes.copy() # have to have these - for task1 in firstTaskTypes: # each task type gathered so far - endState = task1.getEndState() # figure the end state - if endState == None: - continue - for tc in allTaskClasses: # what task can further begin from there? - if tc.canBeginFrom(endState) and (tc not in firstTaskTypes): - taskTypes.append(tc) # gather it - - if len(taskTypes) <= 0: - raise RuntimeError("No suitable task types found for state: {}".format(self._state)) - logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes)) - return taskTypes - - # tasks.append(ReadFixedDataTask(self)) # always for everybody - # if ( self._state == self.STATE_EMPTY ): - # tasks.append(CreateDbTask(self)) - # tasks.append(CreateFixedTableTask(self)) - # elif ( self._state == self.STATE_DB_ONLY ): - # tasks.append(DropDbTask(self)) - # tasks.append(CreateFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # elif ( self._state == self.STATE_TABLE_ONLY ): - # tasks.append(DropFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust - # tasks.append(DropFixedTableTask(self)) - # tasks.append(AddFixedDataTask(self)) - # else: - # raise RuntimeError("Unexpected DbState state: {}".format(self._state)) - # return tasks - - def pickTaskType(self): - taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state - weights = [] - for tt in taskTypes: - endState = tt.getEndState() - if endState != None : - weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method - else: - weights.append(10) # read data task, default to 10: TODO: change to a constant - i = self._weighted_choice_sub(weights) - # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes))) - return taskTypes[i] - - def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/ - rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic? - for i, w in enumerate(weights): - rnd -= w - if rnd < 0: - return i - - def _findCurrentState(self): - dbc = self._dbConn - ts = time.time() - if dbc.query("show databases") == 0 : # no database?! - # logger.debug("Found EMPTY state") - logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time())) - return StateEmpty() - dbc.execute("use db") # did not do this when openning connection - if dbc.query("show tables") == 0 : # no tables - # logger.debug("Found DB ONLY state") - logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time())) - return StateDbOnly() - if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data - # logger.debug("Found TABLE_ONLY state") - logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time())) - return StateSuperTableOnly() - else: - # logger.debug("Found HAS_DATA state") - logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time())) - return StateHasData() - - def transition(self, tasks): - if ( len(tasks) == 0 ): # before 1st step, or otherwise empty - return # do nothing - - self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps - - # Generic Checks, first based on the start state - if self._state.canCreateDb(): - self._state.assertIfExistThenSuccess(tasks, CreateDbTask) - # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops - - if self._state.canDropDb(): - self._state.assertIfExistThenSuccess(tasks, DropDbTask) - # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop - - # if self._state.canCreateFixedTable(): - # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped - # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create - - # if self._state.canDropFixedTable(): - # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped - # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop - - # if self._state.canAddData(): - # self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually - - # if self._state.canReadData(): - # Nothing for sure - - newState = self._findCurrentState() - logger.debug("[STT] New DB state determined: {}".format(newState)) - self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks? - self._state = newState - class TaskExecutor(): def __init__(self, curStep): self._curStep = curStep @@ -923,8 +919,8 @@ class Task(): # logger.debug("Allocating taskSN: {}".format(Task.taskSn)) return Task.taskSn - def __init__(self, dbState: DbState, execStats: ExecutionStats): - self._dbState = dbState + def __init__(self, dbManager: DbManager, execStats: ExecutionStats): + self._dbManager = dbManager self._workerThread = None self._err = None self._curStep = None @@ -940,7 +936,7 @@ class Task(): return self._err == None def clone(self): # TODO: why do we need this again? - newTask = self.__class__(self._dbState, self._execStats) + newTask = self.__class__(self._dbManager, self._execStats) return newTask def logDebug(self, msg): @@ -966,7 +962,7 @@ class Task(): self._executeInternal(te, wt) # TODO: no return value? except taos.error.ProgrammingError as err: self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err)) - self._err = err + self._err = err except: self.logDebug("[=] Unexpected exception") raise @@ -976,7 +972,7 @@ class Task(): self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above. def execSql(self, sql): - return self._dbState.execute(sql) + return self._dbManager.execute(sql) class ExecutionStats: @@ -1043,20 +1039,22 @@ class ExecutionStats: class StateTransitionTask(Task): - # @classmethod - # def getAllTaskClasses(cls): # static - # return cls.__subclasses__() @classmethod def getInfo(cls): # each sub class should supply their own information raise RuntimeError("Overriding method expected") + _endState = None + @classmethod + def getEndState(cls): # TODO: optimize by calling it fewer times + raise RuntimeError("Overriding method expected") + # @classmethod # def getBeginStates(cls): # return cls.getInfo()[0] - @classmethod - def getEndState(cls): # returning the class name - return cls.getInfo()[0] + # @classmethod + # def getEndState(cls): # returning the class name + # return cls.getInfo()[0] @classmethod def canBeginFrom(cls, state: AnyState): @@ -1066,15 +1064,10 @@ class StateTransitionTask(Task): def execute(self, wt: WorkerThread): super().execute(wt) - - -class CreateDbTask(StateTransitionTask): +class TaskCreateDb(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_EMPTY], # can begin from - StateDbOnly() # end state - ] + def getEndState(cls): + return StateDbOnly() @classmethod def canBeginFrom(cls, state: AnyState): @@ -1083,13 +1076,10 @@ class CreateDbTask(StateTransitionTask): def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): wt.execSql("create database db") -class DropDbTask(StateTransitionTask): +class TaskDropDb(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateEmpty() - ] + def getEndState(cls): + return StateEmpty() @classmethod def canBeginFrom(cls, state: AnyState): @@ -1099,122 +1089,140 @@ class DropDbTask(StateTransitionTask): wt.execSql("drop database db") logger.debug("[OPS] database dropped at {}".format(time.time())) -class CreateFixedSuperTableTask(StateTransitionTask): +class TaskCreateSuperTable(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_DB_ONLY], - StateSuperTableOnly() - ] + def getEndState(cls): + return StateSuperTableOnly() @classmethod def canBeginFrom(cls, state: AnyState): return state.canCreateFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedSuperTableName() + tblName = self._dbManager.getFixedSuperTableName() wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName)) # No need to create the regular tables, INSERT will do that automatically -class ReadFixedDataTask(StateTransitionTask): +class TaskReadData(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - None # meaning doesn't affect state - ] + def getEndState(cls): + return None # meaning doesn't affect state @classmethod def canBeginFrom(cls, state: AnyState): return state.canReadData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - sTbName = self._dbState.getFixedSuperTableName() + sTbName = self._dbManager.getFixedSuperTableName() dbc = wt.getDbConn() dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later - rTables = dbc.getQueryResult() - # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - for rTbName in rTables : # regular tables - dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations + dbc.close() + dbc.open() + else: + rTables = dbc.getQueryResult() + # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + for rTbName in rTables : # regular tables + dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure # tdSql.query(" cars where tbname in ('carzero', 'carone')") -class DropFixedSuperTableTask(StateTransitionTask): +class TaskDropSuperTable(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateDbOnly() # meaning doesn't affect state - ] + def getEndState(cls): + return StateDbOnly() @classmethod def canBeginFrom(cls, state: AnyState): return state.canDropFixedSuperTable() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tblName = self._dbState.getFixedSuperTableName() + tblName = self._dbManager.getFixedSuperTableName() wt.execSql("drop table db.{}".format(tblName)) -class AddFixedDataTask(StateTransitionTask): +class TaskAlterTags(StateTransitionTask): @classmethod - def getInfo(cls): - return [ - # [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA], - StateHasData() - ] + def getEndState(cls): + return None # meaning doesn't affect state + + @classmethod + def canBeginFrom(cls, state: AnyState): + return state.canDropFixedSuperTable() # if we can drop it, we can alter tags + + def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): + tblName = self._dbManager.getFixedSuperTableName() + dice = Dice.throw(4) + if dice == 0 : + wt.execSql("alter table db.{} add tag extraTag int".format(tblName)) + elif dice == 1 : + wt.execSql("alter table db.{} drop tag extraTag".format(tblName)) + elif dice == 2 : + wt.execSql("alter table db.{} drop tag newTag".format(tblName)) + else: # dice == 3 + wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName)) + +class TaskAddData(StateTransitionTask): + activeTable : Set[int] = set() # Track which table is being actively worked on + LARGE_NUMBER_OF_TABLES = 35 + SMALL_NUMBER_OF_TABLES = 3 + LARGE_NUMBER_OF_RECORDS = 50 + SMALL_NUMBER_OF_RECORDS = 3 + + # We use these two files to record operations to DB, useful for power-off tests + fAddLogReady = None + fAddLogDone = None + + @classmethod + def prepToRecordOps(cls): + if gConfig.record_ops : + if ( cls.fAddLogReady == None ): + logger.info("Recording in a file operations to be performed...") + cls.fAddLogReady = open("add_log_ready.txt", "w") + if ( cls.fAddLogDone == None ): + logger.info("Recording in a file operations completed...") + cls.fAddLogDone = open("add_log_done.txt", "w") + + @classmethod + def getEndState(cls): + return StateHasData() @classmethod def canBeginFrom(cls, state: AnyState): return state.canAddData() def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState + ds = self._dbManager wt.execSql("use db") # TODO: seems to be an INSERT bug to require this - for i in range(10): # 0 to 9 - for j in range(10) : - sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format( - i, + tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) + random.shuffle(tblSeq) + for i in tblSeq: + if ( i in self.activeTable ): # wow already active + # logger.info("Concurrent data insertion into table: {}".format(i)) + # print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table + print("x", end="", flush=True) + else: + self.activeTable.add(i) # marking it active + # No need to shuffle data sequence, unless later we decide to do non-increment insertion + for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table + nextInt = ds.getNextInt() + regTableName = "db.reg_table_{}".format(i) + if gConfig.record_ops: + self.prepToRecordOps() + self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName)) + self.fAddLogReady.flush() + os.fsync(self.fAddLogReady) + sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format( + regTableName, ds.getFixedSuperTableName(), ds.getNextBinary(), ds.getNextFloat(), - ds.getNextTick(), ds.getNextInt()) + ds.getNextTick(), nextInt) wt.execSql(sql) - - -#---------- Non State-Transition Related Tasks ----------# - -class CreateTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tIndex = self._dbState.addTable() - self.logDebug("Creating a table {} ...".format(tIndex)) - wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex)) - self.logDebug("Table {} created.".format(tIndex)) - self._dbState.releaseTable(tIndex) - -class DropTableTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - tableName = self._dbState.getTableNameToDelete() - if ( not tableName ): # May be "False" - self.logInfo("Cannot generate a table to delete, skipping...") - return - self.logInfo("Dropping a table db.{} ...".format(tableName)) - wt.execSql("drop table db.{}".format(tableName)) - - - -class AddDataTask(Task): - def _executeInternal(self, te: TaskExecutor, wt: WorkerThread): - ds = self._dbState - self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText())) - tIndex = ds.pickAndAllocateTable() - if ( tIndex == None ): - self.logInfo("No table found to add data, skipping...") - return - sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt()) - self.logDebug("[SQL] Executing SQL: {}".format(sql)) - wt.execSql(sql) - ds.releaseTable(tIndex) - self.logDebug("[OPS] Finished adding data") + if gConfig.record_ops: + self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName)) + self.fAddLogDone.flush() + os.fsync(self.fAddLogDone) + self.activeTable.discard(i) # not raising an error, unlike remove # Deterministic random number generator @@ -1288,7 +1296,67 @@ class LoggingFilter(logging.Filter): # return False return True +class MainExec: + @classmethod + def runClient(cls): + # resetDb = False # DEBUG only + # dbState = DbState(resetDb) # DBEUG only! + dbManager = DbManager() # Regular function + Dice.seed(0) # initial seeding of dice + thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps) + tc = ThreadCoordinator(thPool, dbManager) + tc.run() + tc.logStats() + dbManager.cleanUp() + + @classmethod + def runService(cls): + print("Running service...") + + @classmethod + def runTemp(cls): # for debugging purposes + # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix + # dbc = dbState.getDbConn() + # sTbName = dbState.getFixedSuperTableName() + # dbc.execute("create database if not exists db") + # if not dbState.getState().equals(StateEmpty()): + # dbc.execute("use db") + + # rTables = None + # try: # the super table may not exist + # sql = "select TBNAME from db.{}".format(sTbName) + # logger.info("Finding out tables in super table: {}".format(sql)) + # dbc.query(sql) # TODO: analyze result set later + # logger.info("Fetching result") + # rTables = dbc.getQueryResult() + # logger.info("Result: {}".format(rTables)) + # except taos.error.ProgrammingError as err: + # logger.info("Initial Super table OPS error: {}".format(err)) + + # # sys.exit() + # if ( not rTables == None): + # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) + # try: + # for rTbName in rTables : # regular tables + # ds = dbState + # logger.info("Inserting into table: {}".format(rTbName[0])) + # sql = "insert into db.{} values ('{}', {});".format( + # rTbName[0], + # ds.getNextTick(), ds.getNextInt()) + # dbc.execute(sql) + # for rTbName in rTables : # regular tables + # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure + # logger.info("Initial READING operation is successful") + # except taos.error.ProgrammingError as err: + # logger.info("Initial WRITE/READ error: {}".format(err)) + + # Sandbox testing code + # dbc = dbState.getDbConn() + # while True: + # rows = dbc.query("show databases") + # print("Rows: {}, time={}".format(rows, time.time())) + return def main(): # Super cool Python argument library: https://docs.python.org/3/library/argparse.html @@ -1301,20 +1369,27 @@ def main(): 2. You run the server there before this script: ./build/bin/taosd -c test/cfg ''')) - parser.add_argument('-p', '--per-thread-db-connection', action='store_true', - help='Use a single shared db connection (default: false)') + parser.add_argument('-d', '--debug', action='store_true', help='Turn on DEBUG mode for more logging (default: false)') - parser.add_argument('-s', '--max-steps', action='store', default=100, type=int, + parser.add_argument('-e', '--run-tdengine', action='store_true', + help='Run TDengine service in foreground (default: false)') + parser.add_argument('-l', '--larger-data', action='store_true', + help='Write larger amount of data during write operations (default: false)') + parser.add_argument('-p', '--per-thread-db-connection', action='store_false', + help='Use a single shared db connection (default: false)') + parser.add_argument('-r', '--record-ops', action='store_true', + help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)') + parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int, help='Maximum number of steps to run (default: 100)') - parser.add_argument('-t', '--num-threads', action='store', default=10, type=int, + parser.add_argument('-t', '--num-threads', action='store', default=5, type=int, help='Number of threads to run (default: 10)') global gConfig gConfig = parser.parse_args() - if len(sys.argv) == 1: - parser.print_help() - sys.exit() + # if len(sys.argv) == 1: + # parser.print_help() + # sys.exit() global logger logger = logging.getLogger('CrashGen') @@ -1326,62 +1401,11 @@ def main(): ch = logging.StreamHandler() logger.addHandler(ch) - # resetDb = False # DEBUG only - # dbState = DbState(resetDb) # DBEUG only! - dbState = DbState() # Regular function - Dice.seed(0) # initial seeding of dice - tc = ThreadCoordinator( - ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0), - # WorkDispatcher(dbState), # Obsolete? - dbState - ) + if gConfig.run_tdengine : # run server + MainExec.runService() + else : + MainExec.runClient() - # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix - # dbc = dbState.getDbConn() - # sTbName = dbState.getFixedSuperTableName() - # dbc.execute("create database if not exists db") - # if not dbState.getState().equals(StateEmpty()): - # dbc.execute("use db") - - # rTables = None - # try: # the super table may not exist - # sql = "select TBNAME from db.{}".format(sTbName) - # logger.info("Finding out tables in super table: {}".format(sql)) - # dbc.query(sql) # TODO: analyze result set later - # logger.info("Fetching result") - # rTables = dbc.getQueryResult() - # logger.info("Result: {}".format(rTables)) - # except taos.error.ProgrammingError as err: - # logger.info("Initial Super table OPS error: {}".format(err)) - - # # sys.exit() - # if ( not rTables == None): - # # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0]))) - # try: - # for rTbName in rTables : # regular tables - # ds = dbState - # logger.info("Inserting into table: {}".format(rTbName[0])) - # sql = "insert into db.{} values ('{}', {});".format( - # rTbName[0], - # ds.getNextTick(), ds.getNextInt()) - # dbc.execute(sql) - # for rTbName in rTables : # regular tables - # dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure - # logger.info("Initial READING operation is successful") - # except taos.error.ProgrammingError as err: - # logger.info("Initial WRITE/READ error: {}".format(err)) - - - - # Sandbox testing code - # dbc = dbState.getDbConn() - # while True: - # rows = dbc.query("show databases") - # print("Rows: {}, time={}".format(rows, time.time())) - - tc.run() - tc.logStats() - dbState.cleanUp() # logger.info("Crash_Gen execution finished") diff --git a/tests/script/general/parser/auto_create_tb.sim b/tests/script/general/parser/auto_create_tb.sim index 6065daa6d3..64fec4b56d 100644 --- a/tests/script/general/parser/auto_create_tb.sim +++ b/tests/script/general/parser/auto_create_tb.sim @@ -153,13 +153,13 @@ print $rows $data00 $data10 $data20 if $rows != 3 then return -1 endi -if $data00 != tb3 then +if $data00 != tb1 then return -1 endi if $data10 != tb2 then return -1 endi -if $data20 != tb1 then +if $data20 != tb3 then return -1 endi @@ -221,13 +221,13 @@ sql show tables if $rows != 3 then return -1 endi -if $data00 != tb3 then +if $data00 != tb1 then return -1 endi if $data10 != tb2 then return -1 endi -if $data20 != tb1 then +if $data20 != tb3 then return -1 endi diff --git a/tests/script/general/parser/projection_limit_offset.sim b/tests/script/general/parser/projection_limit_offset.sim index 5f006d0eb7..2b89946ef8 100644 --- a/tests/script/general/parser/projection_limit_offset.sim +++ b/tests/script/general/parser/projection_limit_offset.sim @@ -80,6 +80,7 @@ print $rows sql select ts from group_mt0 where ts>='1970-1-1 8:1:43' and ts<='1970-1-1 8:1:43.500' limit 8000 offset 0; if $rows != 4008 then + print expect 4008, actual:$rows return -1 endi diff --git a/tests/test/c/CMakeLists.txt b/tests/test/c/CMakeLists.txt index d40db5ee40..4717a4f769 100644 --- a/tests/test/c/CMakeLists.txt +++ b/tests/test/c/CMakeLists.txt @@ -7,15 +7,21 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc) INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc) IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) - add_executable(insertPerTable insertPerTable.c) - target_link_libraries(insertPerTable taos_static pthread) + #add_executable(insertPerTable insertPerTable.c) + #target_link_libraries(insertPerTable taos_static pthread) - add_executable(insertPerRow insertPerRow.c) - target_link_libraries(insertPerRow taos_static pthread) + #add_executable(insertPerRow insertPerRow.c) + #target_link_libraries(insertPerRow taos_static pthread) - add_executable(importOneRow importOneRow.c) - target_link_libraries(importOneRow taos_static pthread) + #add_executable(importOneRow importOneRow.c) + #target_link_libraries(importOneRow taos_static pthread) - add_executable(importPerTable importPerTable.c) - target_link_libraries(importPerTable taos_static pthread) + #add_executable(importPerTable importPerTable.c) + #target_link_libraries(importPerTable taos_static pthread) + + #add_executable(hashPerformance hashPerformance.c) + #target_link_libraries(hashPerformance taos_static tutil common pthread) + + add_executable(createTablePerformance createTablePerformance.c) + target_link_libraries(createTablePerformance taos_static tutil common pthread) ENDIF() diff --git a/tests/test/c/createTablePerformance.c b/tests/test/c/createTablePerformance.c new file mode 100644 index 0000000000..4ab6f98423 --- /dev/null +++ b/tests/test/c/createTablePerformance.c @@ -0,0 +1,232 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tulog.h" +#include "ttime.h" +#include "tutil.h" +#include "tglobal.h" +#include "hash.h" + +#define MAX_RANDOM_POINTS 20000 +#define GREEN "\033[1;32m" +#define NC "\033[0m" + +char dbName[32] = "db"; +char stableName[64] = "st"; +int32_t numOfThreads = 30; +int32_t numOfTables = 100000; +int32_t maxTables = 5000; +int32_t numOfColumns = 2; + +typedef struct { + int32_t tableBeginIndex; + int32_t tableEndIndex; + int32_t threadIndex; + char dbName[32]; + char stableName[64]; + float createTableSpeed; + pthread_t thread; +} SThreadInfo; + +void shellParseArgument(int argc, char *argv[]); +void *threadFunc(void *param); +void createDbAndSTable(); + +int main(int argc, char *argv[]) { + shellParseArgument(argc, argv); + taos_init(); + createDbAndSTable(); + + pPrint("%d threads are spawned to create table", numOfThreads); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); + + int32_t numOfTablesPerThread = numOfTables / numOfThreads; + numOfTables = numOfTablesPerThread * numOfThreads; + for (int i = 0; i < numOfThreads; ++i) { + pInfo[i].tableBeginIndex = i * numOfTablesPerThread; + pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread; + pInfo[i].threadIndex = i; + strcpy(pInfo[i].dbName, dbName); + strcpy(pInfo[i].stableName, stableName); + pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i)); + } + + taosMsleep(300); + for (int i = 0; i < numOfThreads; i++) { + pthread_join(pInfo[i].thread, NULL); + } + + float createTableSpeed = 0; + for (int i = 0; i < numOfThreads; ++i) { + createTableSpeed += pInfo[i].createTableSpeed; + } + + pPrint("%s total speed:%.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC); + + pthread_attr_destroy(&thattr); + free(pInfo); +} + +void createDbAndSTable() { + pPrint("start to create db and stable"); + char qstr[64000]; + + TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (con == NULL) { + pError("failed to connect to DB, reason:%s", taos_errstr(con)); + exit(1); + } + + sprintf(qstr, "create database if not exists %s maxtables %d", dbName, maxTables); + TAOS_RES *pSql = taos_query(con, qstr); + int32_t code = taos_errno(pSql); + if (code != 0) { + pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + + sprintf(qstr, "use %s", dbName); + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + + int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName); + for (int32_t f = 0; f < numOfColumns - 1; ++f) { + len += sprintf(qstr + len, ", f%d double", f); + } + sprintf(qstr + len, ") tags(t int)"); + + pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con)); + exit(0); + } + taos_free_result(pSql); + + taos_close(con); +} + +void *threadFunc(void *param) { + SThreadInfo *pInfo = (SThreadInfo *)param; + char qstr[65000]; + int code; + + TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (con == NULL) { + pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con)); + exit(1); + } + + sprintf(qstr, "use %s", pInfo->dbName); + TAOS_RES *pSql = taos_query(con, qstr); + taos_free_result(pSql); + + int64_t startMs = taosGetTimestampMs(); + + for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + sprintf(qstr, "create table if not exists %s%d using %s tags(%d)", stableName, t, stableName, t); + TAOS_RES *pSql = taos_query(con, qstr); + code = taos_errno(pSql); + if (code != 0) { + pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con)); + } + taos_free_result(pSql); + } + + float createTableSpeed = 0; + for (int i = 0; i < numOfThreads; ++i) { + createTableSpeed += pInfo[i].createTableSpeed; + } + + int64_t endMs = taosGetTimestampMs(); + int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; + float seconds = (endMs - startMs) / 1000.0; + float speed = totalTables / seconds; + pInfo->createTableSpeed = speed; + + pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed); + taos_close(con); + + return 0; +} + +void printHelp() { + char indent[10] = " "; + printf("Used to test the performance while create table\n"); + + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); + printf("%s%s\n", indent, "-d"); + printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", dbName); + printf("%s%s\n", indent, "-s"); + printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", stableName); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads); + printf("%s%s\n", indent, "-n"); + printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables); + printf("%s%s\n", indent, "-columns"); + printf("%s%s%s%d\n", indent, indent, "numOfColumns, default is ", numOfColumns); + printf("%s%s\n", indent, "-tables"); + printf("%s%s%s%d\n", indent, indent, "Database parameters tables, default is ", maxTables); + + exit(EXIT_SUCCESS); +} + +void shellParseArgument(int argc, char *argv[]) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-d") == 0) { + strcpy(dbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); + } else if (strcmp(argv[i], "-s") == 0) { + strcpy(stableName, argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0) { + numOfTables = atoi(argv[++i]); + } else if (strcmp(argv[i], "-tables") == 0) { + maxTables = atoi(argv[++i]); + } else if (strcmp(argv[i], "-columns") == 0) { + numOfColumns = atoi(argv[++i]); + } else { + } + } + + pPrint("%s dbName:%s %s", GREEN, dbName, NC); + pPrint("%s stableName:%s %s", GREEN, stableName, NC); + pPrint("%s configDir:%s %s", GREEN, configDir, NC); + pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC); + pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC); + pPrint("%s numOfColumns:%d %s", GREEN, numOfColumns, NC); + pPrint("%s dbPara maxTables:%d %s", GREEN, maxTables, NC); + + pPrint("%s start create table performace test %s", GREEN, NC); +} diff --git a/tests/test/c/hashPerformance.c b/tests/test/c/hashPerformance.c new file mode 100644 index 0000000000..fa3612d6e8 --- /dev/null +++ b/tests/test/c/hashPerformance.c @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taos.h" +#include "tulog.h" +#include "ttime.h" +#include "tutil.h" +#include "hash.h" + +#define MAX_RANDOM_POINTS 20000 +#define GREEN "\033[1;32m" +#define NC "\033[0m" + +int32_t capacity = 100000; +int32_t q1Times = 1; +int32_t q2Times = 1; +int32_t keyNum = 100000; +int32_t printInterval = 10000; + +typedef struct HashTestRow { + int32_t size; + void * ptr; +} HashTestRow; + +void shellParseArgument(int argc, char *argv[]); + +void testHashPerformance() { + int64_t initialMs = taosGetTimestampMs(); + _hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + void * hashHandle = taosHashInit(capacity, hashFp, true); + + int64_t startMs = taosGetTimestampMs(); + float seconds = (startMs - initialMs) / 1000.0; + pPrint("initial time %.2f sec", seconds); + + for (int32_t t = 1; t <= keyNum; ++t) { + HashTestRow row = {0}; + char key[100] = {0}; + int32_t keySize = sprintf(key, "0.db.st%d", t); + + for (int32_t q = 0; q < q1Times; q++) { + taosHashGet(hashHandle, &key, keySize); + } + + taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow)); + + for (int32_t q = 0; q < q2Times; q++) { + taosHashGet(hashHandle, &key, keySize); + } + + if (t % printInterval == 0) { + int64_t endMs = taosGetTimestampMs(); + int64_t hashSize = taosHashGetSize(hashHandle); + float seconds = (endMs - startMs) / 1000.0; + float speed = printInterval / seconds; + pPrint("time:%.2f sec, speed:%.1f rows/second, hashSize:%ld", seconds, speed, hashSize); + startMs = endMs; + } + } + + int64_t endMs = taosGetTimestampMs(); + int64_t hashSize = taosHashGetSize(hashHandle); + seconds = (endMs - initialMs) / 1000.0; + float speed = hashSize / seconds; + + pPrint("total time:%.2f sec, avg speed:%.1f rows/second, hashSize:%ld", seconds, speed, hashSize); + taosHashCleanup(hashHandle); +} + +int main(int argc, char *argv[]) { + shellParseArgument(argc, argv); + testHashPerformance(); +} + +void printHelp() { + char indent[10] = " "; + printf("Used to test the performance of cache\n"); + + printf("%s%s\n", indent, "-k"); + printf("%s%s%s%d\n", indent, indent, "key num, default is ", keyNum); + printf("%s%s\n", indent, "-p"); + printf("%s%s%s%d\n", indent, indent, "print interval while put into hash, default is ", printInterval); + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%d\n", indent, indent, "the initial capacity of hash ", capacity); + printf("%s%s\n", indent, "-q1"); + printf("%s%s%s%d\n", indent, indent, "query times before put into hash", q1Times); + printf("%s%s\n", indent, "-q2"); + printf("%s%s%s%d\n", indent, indent, "query times after put into hash", q2Times); + + exit(EXIT_SUCCESS); +} + +void shellParseArgument(int argc, char *argv[]) { + for (int i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-k") == 0) { + keyNum = atoi(argv[++i]); + } else if (strcmp(argv[i], "-p") == 0) { + printInterval = atoi(argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + capacity = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q1") == 0) { + q1Times = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q2") == 0) { + q2Times = atoi(argv[++i]); + } else { + } + } + + pPrint("%s capacity:%d %s", GREEN, capacity, NC); + pPrint("%s printInterval:%d %s", GREEN, printInterval, NC); + pPrint("%s q1Times:%d %s", GREEN, q1Times, NC); + pPrint("%s q2Times:%d %s", GREEN, q2Times, NC); + pPrint("%s keyNum:%d %s", GREEN, keyNum, NC); +}