Merge branch 'hotfix/leak' of https://github.com/taosdata/TDengine into hotfix/leak
This commit is contained in:
commit
6ecec7ac05
|
@ -34,7 +34,9 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
|||
VERSION_INFO)
|
||||
MESSAGE(STATUS "build version ${VERSION_INFO}")
|
||||
SET_TARGET_PROPERTIES(taos PROPERTIES VERSION ${VERSION_INFO} SOVERSION 1)
|
||||
|
||||
|
||||
ADD_SUBDIRECTORY(tests)
|
||||
|
||||
ELSEIF (TD_WINDOWS_64)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/windows/win32)
|
||||
|
|
|
@ -1351,6 +1351,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
|
|||
static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlocks *pTableDataBlocks) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
pSql->res.numOfRows = 0;
|
||||
|
||||
assert(pCmd->numOfClause == 1);
|
||||
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
|
||||
|
@ -1394,6 +1395,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
|
|||
fclose(fp);
|
||||
|
||||
pParentSql->res.code = code;
|
||||
tscQueueAsyncRes(pParentSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1458,8 +1460,11 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int code) {
|
|||
free(line);
|
||||
|
||||
if (count > 0) {
|
||||
if ((code = doPackSendDataBlock(pSql, count, pTableDataBlock)) != TSDB_CODE_SUCCESS) {
|
||||
code = doPackSendDataBlock(pSql, count, pTableDataBlock);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pParentSql->res.code = code;
|
||||
tscQueueAsyncRes(pParentSql);
|
||||
return;
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||
PROJECT(TDengine)
|
||||
|
||||
FIND_PATH(HEADER_GTEST_INCLUDE_DIR gtest.h /usr/include/gtest /usr/local/include/gtest)
|
||||
FIND_LIBRARY(LIB_GTEST_STATIC_DIR libgtest.a /usr/lib/ /usr/local/lib)
|
||||
|
||||
IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR)
|
||||
MESSAGE(STATUS "gTest library found, build unit test")
|
||||
|
||||
INCLUDE_DIRECTORIES(${HEADER_GTEST_INCLUDE_DIR})
|
||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
ADD_EXECUTABLE(cliTest ${SOURCE_LIST})
|
||||
TARGET_LINK_LIBRARIES(cliTest taos tutil common gtest pthread)
|
||||
ENDIF()
|
|
@ -20,6 +20,7 @@
|
|||
#include "tconfig.h"
|
||||
#include "tutil.h"
|
||||
|
||||
// TODO refactor to set the tz value through parameter
|
||||
void tsSetTimeZone() {
|
||||
SGlobalCfg *cfg_timezone = taosGetConfigOption("timezone");
|
||||
uPrint("timezone is set to %s by %s", tsTimezone, tsCfgStatusStr[cfg_timezone->cfgStatus]);
|
||||
|
|
|
@ -113,6 +113,7 @@ void dnodeFreeMnodePqueue() {
|
|||
void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) {
|
||||
if (!mnodeIsRunning() || tsMPeerQueue == NULL) {
|
||||
dnodeSendRedirectMsg(pMsg, false);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -116,6 +116,7 @@ void dnodeFreeMnodeRqueue() {
|
|||
void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) {
|
||||
if (!mnodeIsRunning() || tsMReadQueue == NULL) {
|
||||
dnodeSendRedirectMsg(pMsg, true);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -115,6 +115,7 @@ void dnodeFreeMnodeWqueue() {
|
|||
void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) {
|
||||
if (!mnodeIsRunning() || tsMWriteQueue == NULL) {
|
||||
dnodeSendRedirectMsg(pMsg, true);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,9 +38,9 @@ static int32_t tsDnodeQueryReqNum = 0;
|
|||
static int32_t tsDnodeSubmitReqNum = 0;
|
||||
|
||||
int32_t dnodeInitShell() {
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVnodeWriteQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue;
|
||||
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVnodeWriteQueue;
|
||||
|
||||
// the following message shall be treated as mnode write
|
||||
|
|
|
@ -354,7 +354,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
|||
#define TSDB_DEFAULT_DBS_HASH_SIZE 100
|
||||
#define TSDB_DEFAULT_VGROUPS_HASH_SIZE 100
|
||||
#define TSDB_DEFAULT_STABLES_HASH_SIZE 100
|
||||
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 10000
|
||||
#define TSDB_DEFAULT_CTABLES_HASH_SIZE 20000
|
||||
|
||||
#define TSDB_PORT_DNODESHELL 0
|
||||
#define TSDB_PORT_DNODEDNODE 5
|
||||
|
|
|
@ -499,7 +499,7 @@ int main(int argc, char *argv[]) {
|
|||
/* Create all the tables; */
|
||||
printf("Creating %d table(s)......\n", ntables);
|
||||
for (int i = 0; i < ntables; i++) {
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d (ts timestamp%s;", db_name, tb_prefix, i, cols);
|
||||
queryDB(taos, command);
|
||||
}
|
||||
|
||||
|
@ -509,7 +509,7 @@ int main(int argc, char *argv[]) {
|
|||
} else {
|
||||
/* Create metric table */
|
||||
printf("Creating meters super table...\n");
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.meters (ts timestamp%s tags (areaid int, loc binary(10))", db_name, cols);
|
||||
queryDB(taos, command);
|
||||
printf("meters created!\n");
|
||||
|
||||
|
@ -523,9 +523,9 @@ int main(int argc, char *argv[]) {
|
|||
j = i % 10;
|
||||
}
|
||||
if (j % 2 == 0) {
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai");
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "shanghai");
|
||||
} else {
|
||||
snprintf(command, BUFFER_SIZE, "create table %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing");
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.meters tags (%d,\"%s\");", db_name, tb_prefix, i, db_name, j, "beijing");
|
||||
}
|
||||
queryDB(taos, command);
|
||||
}
|
||||
|
@ -847,10 +847,10 @@ void *syncWrite(void *sarg) {
|
|||
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, tID);
|
||||
int k;
|
||||
for (k = 0; k < winfo->nrecords_per_request;) {
|
||||
int rand_num = trand() % 100;
|
||||
int rand_num = rand() % 100;
|
||||
int len = -1;
|
||||
if (winfo->data_of_order ==1 && rand_num < winfo->data_of_rate) {
|
||||
long d = tmp_time - trand() % 1000000 + rand_num;
|
||||
long d = tmp_time - rand() % 1000000 + rand_num;
|
||||
len = generateData(data, data_type, ncols_per_record, d, len_of_binary);
|
||||
} else {
|
||||
len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
|
||||
|
@ -942,10 +942,10 @@ void callBack(void *param, TAOS_RES *res, int code) {
|
|||
pstr += sprintf(pstr, "insert into %s values", tb_info->tb_name);
|
||||
|
||||
for (int i = 0; i < tb_info->nrecords_per_request; i++) {
|
||||
int rand_num = trand() % 100;
|
||||
int rand_num = rand() % 100;
|
||||
if (tb_info->data_of_order ==1 && rand_num < tb_info->data_of_rate)
|
||||
{
|
||||
long d = tmp_time - trand() % 1000000 + rand_num;
|
||||
long d = tmp_time - rand() % 1000000 + rand_num;
|
||||
generateData(data, datatype, ncols_per_record, d, len_of_binary);
|
||||
} else
|
||||
{
|
||||
|
@ -994,20 +994,20 @@ int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t times
|
|||
|
||||
for (int i = 0; i < num_of_cols; i++) {
|
||||
if (strcasecmp(data_type[i % c], "tinyint") == 0) {
|
||||
pstr += sprintf(pstr, ", %d", (int)(trand() % 128));
|
||||
pstr += sprintf(pstr, ", %d", (int)(rand() % 128));
|
||||
} else if (strcasecmp(data_type[i % c], "smallint") == 0) {
|
||||
pstr += sprintf(pstr, ", %d", (int)(trand() % 32767));
|
||||
pstr += sprintf(pstr, ", %d", (int)(rand() % 32767));
|
||||
} else if (strcasecmp(data_type[i % c], "int") == 0) {
|
||||
pstr += sprintf(pstr, ", %d", (int)(trand() % 10));
|
||||
pstr += sprintf(pstr, ", %d", (int)(rand() % 10));
|
||||
} else if (strcasecmp(data_type[i % c], "bigint") == 0) {
|
||||
pstr += sprintf(pstr, ", %" PRId64, trand() % 2147483648);
|
||||
pstr += sprintf(pstr, ", %" PRId64, rand() % 2147483648);
|
||||
} else if (strcasecmp(data_type[i % c], "float") == 0) {
|
||||
pstr += sprintf(pstr, ", %10.4f", (float)(trand() / 1000.0));
|
||||
pstr += sprintf(pstr, ", %10.4f", (float)(rand() / 1000.0));
|
||||
} else if (strcasecmp(data_type[i % c], "double") == 0) {
|
||||
double t = (double)(trand() / 1000000.0);
|
||||
double t = (double)(rand() / 1000000.0);
|
||||
pstr += sprintf(pstr, ", %20.8f", t);
|
||||
} else if (strcasecmp(data_type[i % c], "bool") == 0) {
|
||||
bool b = trand() & 1;
|
||||
bool b = rand() & 1;
|
||||
pstr += sprintf(pstr, ", %s", b ? "true" : "false");
|
||||
} else if (strcasecmp(data_type[i % c], "binary") == 0) {
|
||||
char s[len_of_binary];
|
||||
|
@ -1033,7 +1033,7 @@ void rand_string(char *str, int size) {
|
|||
--size;
|
||||
int n;
|
||||
for (n = 0; n < size; n++) {
|
||||
int key = trand() % (int)(sizeof charset - 1);
|
||||
int key = rand() % (int)(sizeof charset - 1);
|
||||
str[n] = charset[key];
|
||||
}
|
||||
str[n] = 0;
|
||||
|
|
|
@ -968,6 +968,17 @@ static int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg) {
|
|||
return mnodeAlterDb(pMsg->pDb, pAlter, pMsg);
|
||||
}
|
||||
|
||||
static int32_t mnodeDropDbCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SDbObj *pDb = pMsg->pDb;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("db:%s, failed to drop from sdb, reason:%s", pDb->name, tstrerror(code));
|
||||
} else {
|
||||
mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
|
||||
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
||||
|
||||
|
@ -978,12 +989,12 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
|
|||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsDbSdb,
|
||||
.pObj = pDb,
|
||||
.pMsg = pMsg
|
||||
.pMsg = pMsg,
|
||||
.cb = mnodeDropDbCb
|
||||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
mLPrint("db:%s, is dropped by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
|
||||
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -250,7 +250,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
|||
sdbTrace("forward request confirmed, version:%" PRIu64 ", result:%s", (int64_t)param, tstrerror(code));
|
||||
}
|
||||
|
||||
static int32_t sdbForwardToPeer(SWalHead *pHead) {
|
||||
static int32_t sdbForwardToPeer(SWalHead *pHead) {
|
||||
if (tsSdbObj.sync == NULL) return TSDB_CODE_SUCCESS;
|
||||
|
||||
int32_t code = syncForwardToPeer(tsSdbObj.sync, pHead, (void*)pHead->version, TAOS_QTYPE_RPC);
|
||||
|
@ -782,7 +782,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
|
|||
pTable->restoredFp = pDesc->restoredFp;
|
||||
|
||||
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
|
||||
if (pTable->keyType == SDB_KEY_STRING) {
|
||||
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
|
||||
hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
}
|
||||
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
|
||||
|
|
|
@ -854,13 +854,15 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||
SSuperTableObj *pTable = (SSuperTableObj *)pMsg->pTable;
|
||||
if (pTable != NULL) {
|
||||
mLPrint("app:%p:%p, stable:%s, is dropped from sdb, result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
|
||||
tstrerror(code));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
|
||||
} else {
|
||||
mLPrint("app:%p:%p, stable:%s, is dropped from sdb", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
||||
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
|
||||
|
||||
|
@ -899,12 +901,10 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
|
|||
};
|
||||
|
||||
int32_t code = sdbDeleteRow(&oper);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("app:%p:%p, table:%s, failed to drop, sdb error", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId);
|
||||
return code;
|
||||
} else {
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mnodeFindSuperTableTagIndex(SSuperTableObj *pStable, const char *tagName) {
|
||||
|
|
|
@ -767,6 +767,9 @@ static void* getDataBlockImpl(SArray* pDataBlock, int32_t colId) {
|
|||
|
||||
static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas, int32_t col, int32_t size,
|
||||
SArray *pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
char *dataBlock = NULL;
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
|
@ -819,7 +822,7 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas
|
|||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions
|
||||
* @param pRuntimeEnv
|
||||
* @param forwardStep
|
||||
* @param tsCols
|
||||
|
@ -854,6 +857,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
|
|||
|
||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery);
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win) != TSDB_CODE_SUCCESS) {
|
||||
tfree(sasArray);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1060,16 +1064,18 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
STableQueryInfo* item = pQuery->current;
|
||||
|
||||
TSKEY *tsCols = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
|
||||
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
||||
|
||||
SColumnInfoData* pColumnInfoData = (SColumnInfoData *)taosArrayGet(pDataBlock, 0);
|
||||
|
||||
TSKEY *tsCols = (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (TSKEY*) pColumnInfoData->pData:NULL;
|
||||
bool groupbyColumnValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
|
||||
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
|
||||
|
||||
int16_t type = 0;
|
||||
int16_t bytes = 0;
|
||||
|
||||
char *groupbyColumnData = NULL;
|
||||
if (groupbyStateValue) {
|
||||
if (groupbyColumnValue) {
|
||||
groupbyColumnData = getGroupbyColumnData(pQuery, &type, &bytes, pDataBlock);
|
||||
}
|
||||
|
||||
|
@ -1157,7 +1163,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
pWindowResInfo->curIndex = index;
|
||||
} else { // other queries
|
||||
// decide which group this rows belongs to according to current state value
|
||||
if (groupbyStateValue) {
|
||||
if (groupbyColumnValue) {
|
||||
char *val = groupbyColumnData + bytes * offset;
|
||||
|
||||
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes);
|
||||
|
@ -1182,9 +1188,14 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
item->lastKey = tsCols[offset] + step;
|
||||
|
||||
|
||||
assert(offset >= 0);
|
||||
if (tsCols != NULL) {
|
||||
item->lastKey = tsCols[offset] + step;
|
||||
} else {
|
||||
item->lastKey = (QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.ekey:pDataBlockInfo->window.skey) + step;
|
||||
}
|
||||
|
||||
// todo refactor: extract method
|
||||
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
if (pQuery->pSelectExpr[i].base.functionId != TSDB_FUNC_ARITHM) {
|
||||
|
@ -1349,10 +1360,13 @@ static void setCtxTagColumnInfo(SQuery *pQuery, SQLFunctionCtx *pCtx) {
|
|||
// the column may be the normal column, group by normal_column, the functionId is TSDB_FUNC_PRJ
|
||||
}
|
||||
}
|
||||
|
||||
p->tagInfo.pTagCtxList = pTagCtx;
|
||||
p->tagInfo.numOfTagCols = num;
|
||||
p->tagInfo.tagsLen = tagLen;
|
||||
if (p != NULL) {
|
||||
p->tagInfo.pTagCtxList = pTagCtx;
|
||||
p->tagInfo.numOfTagCols = num;
|
||||
p->tagInfo.tagsLen = tagLen;
|
||||
} else {
|
||||
tfree(pTagCtx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3497,7 +3511,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde
|
|||
continue;
|
||||
}
|
||||
|
||||
assert(result[i].numOfRows >= 0 && pQInfo->offset <= 1);
|
||||
assert(pQInfo->offset <= 1);
|
||||
|
||||
int32_t numOfRowsToCopy = result[i].numOfRows - pQInfo->offset;
|
||||
int32_t oldOffset = pQInfo->offset;
|
||||
|
@ -5295,9 +5309,9 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
|
|||
bytes = s.bytes;
|
||||
} else{
|
||||
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
|
||||
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags || j == TSDB_TBNAME_COLUMN_INDEX);
|
||||
assert(j < pQueryMsg->numOfCols || j < pQueryMsg->numOfTags);
|
||||
|
||||
if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX) {
|
||||
if (pExprs[i].base.colInfo.colId != TSDB_TBNAME_COLUMN_INDEX && j >= 0) {
|
||||
SColumnInfo* pCol = (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag))? &pTagCols[j]:&pQueryMsg->colList[j];
|
||||
type = pCol->type;
|
||||
bytes = pCol->bytes;
|
||||
|
@ -5339,8 +5353,6 @@ static int32_t createQFunctionExprFromMsg(SQueryTableMsg *pQueryMsg, SExprInfo *
|
|||
assert(ret == TSDB_CODE_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
tfree(pExprMsg);
|
||||
*pExprInfo = pExprs;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -5591,11 +5603,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
|||
pQInfo->signature = pQInfo;
|
||||
|
||||
pQInfo->tableGroupInfo = *pTableGroupInfo;
|
||||
size_t numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
|
||||
size_t numOfGroups = 0;
|
||||
if (pTableGroupInfo->pGroupList != NULL) {
|
||||
numOfGroups = taosArrayGetSize(pTableGroupInfo->pGroupList);
|
||||
|
||||
pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
|
||||
pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables;
|
||||
}
|
||||
|
||||
pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES);
|
||||
pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables;
|
||||
|
||||
int tableIndex = 0;
|
||||
STimeWindow window = pQueryMsg->window;
|
||||
taosArraySort(pTableIdList, compareTableIdInfo);
|
||||
|
@ -5693,7 +5708,8 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
|||
pTSBuf = tsBufCreateFromCompBlocks(tsBlock, pQueryMsg->tsNumOfBlocks, pQueryMsg->tsLen, pQueryMsg->tsOrder);
|
||||
|
||||
tsBufResetPos(pTSBuf);
|
||||
tsBufNextPos(pTSBuf);
|
||||
bool ret = tsBufNextPos(pTSBuf);
|
||||
UNUSED(ret);
|
||||
}
|
||||
|
||||
// only the successful complete requries the sem_post/over = 1 operations.
|
||||
|
@ -5839,18 +5855,23 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
|
|||
|
||||
// make sure file exist
|
||||
if (FD_VALID(fd)) {
|
||||
size_t s = lseek(fd, 0, SEEK_END);
|
||||
qTrace("QInfo:%p ts comp data return, file:%s, size:%zu", pQInfo, pQuery->sdata[0]->data, s);
|
||||
|
||||
lseek(fd, 0, SEEK_SET);
|
||||
read(fd, data, s);
|
||||
int32_t s = lseek(fd, 0, SEEK_END);
|
||||
UNUSED(s);
|
||||
qTrace("QInfo:%p ts comp data return, file:%s, size:%d", pQInfo, pQuery->sdata[0]->data, s);
|
||||
s = lseek(fd, 0, SEEK_SET);
|
||||
if (s >= 0) {
|
||||
size_t sz = read(fd, data, s);
|
||||
UNUSED(sz);
|
||||
}
|
||||
close(fd);
|
||||
|
||||
unlink(pQuery->sdata[0]->data);
|
||||
} else {
|
||||
// todo return the error code to client
|
||||
// todo return the error code to client and handle invalid fd
|
||||
qError("QInfo:%p failed to open tmp file to send ts-comp data to client, path:%s, reason:%s", pQInfo,
|
||||
pQuery->sdata[0]->data, strerror(errno));
|
||||
if (fd != -1) {
|
||||
close(fd);
|
||||
}
|
||||
}
|
||||
|
||||
// all data returned, set query over
|
||||
|
@ -5903,7 +5924,6 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
|
|||
}
|
||||
|
||||
if ((code = createQFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
||||
free(pExprMsg);
|
||||
goto _over;
|
||||
}
|
||||
|
||||
|
@ -5975,6 +5995,7 @@ _over:
|
|||
}
|
||||
free(pTagColumnInfo);
|
||||
free(pExprs);
|
||||
free(pExprMsg);
|
||||
taosArrayDestroy(pTableIdList);
|
||||
|
||||
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
|
||||
|
|
|
@ -880,8 +880,11 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
|
|||
|
||||
for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) {
|
||||
size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file);
|
||||
UNUSED(sz);
|
||||
tMemBucketPut(pMemBucket, pPage->data, pPage->num);
|
||||
if (sz != pMemBuffer->pageSize) {
|
||||
uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path);
|
||||
} else {
|
||||
tMemBucketPut(pMemBucket, pPage->data, pPage->num);
|
||||
}
|
||||
}
|
||||
|
||||
fclose(pMemBuffer->file);
|
||||
|
|
|
@ -331,6 +331,7 @@ void rpcFreeCont(void *cont) {
|
|||
if ( cont ) {
|
||||
char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
|
||||
free(temp);
|
||||
// tTrace("free mem: %p", temp);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -540,6 +541,7 @@ static void rpcFreeMsg(void *msg) {
|
|||
if ( msg ) {
|
||||
char *temp = (char *)msg - sizeof(SRpcReqContext);
|
||||
free(temp);
|
||||
// tTrace("free mem: %p", temp);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -418,6 +418,8 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
|
|||
if ( NULL == buffer) {
|
||||
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
|
||||
return -1;
|
||||
} else {
|
||||
// tTrace("malloc mem: %p", buffer);
|
||||
}
|
||||
|
||||
msg = buffer + tsRpcOverhead;
|
||||
|
|
|
@ -211,6 +211,8 @@ static void *taosRecvUdpData(void *param) {
|
|||
if (NULL == tmsg) {
|
||||
tError("%s failed to allocate memory, size:%ld", pConn->label, dataLen);
|
||||
continue;
|
||||
} else {
|
||||
// tTrace("malloc mem: %p", tmsg);
|
||||
}
|
||||
|
||||
tmsg += tsRpcOverhead; // overhead for SRpcReqContext
|
||||
|
|
|
@ -553,10 +553,18 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tsdbRefTable(STable *pTable) { T_REF_INC(pTable); }
|
||||
void tsdbRefTable(STable *pTable) {
|
||||
int16_t ref = T_REF_INC(pTable);
|
||||
tsdbTrace("ref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref);
|
||||
}
|
||||
|
||||
void tsdbUnRefTable(STable *pTable) {
|
||||
if (T_REF_DEC(pTable) == 0) {
|
||||
int16_t ref = T_REF_DEC(pTable);
|
||||
tsdbTrace("unref table:%s, uid:%"PRIu64", tid:%d, ref:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid, ref);
|
||||
|
||||
if (ref == 0) {
|
||||
tsdbTrace("destroy table:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), pTable->tableId.uid, pTable->tableId.tid);
|
||||
|
||||
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
|
||||
tsdbUnRefTable(pTable->pSuper);
|
||||
}
|
||||
|
|
|
@ -38,7 +38,7 @@ static FORCE_INLINE int32_t getSkipListNodeRandomHeight(SSkipList *pSkipList) {
|
|||
const uint32_t factor = 4;
|
||||
|
||||
int32_t n = 1;
|
||||
while ((taosRand() % factor) == 0 && n <= pSkipList->maxLevel) {
|
||||
while ((rand() % factor) == 0 && n <= pSkipList->maxLevel) {
|
||||
n++;
|
||||
}
|
||||
|
||||
|
|
|
@ -48,23 +48,21 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
|
|||
const unsigned int day, const unsigned int hour,
|
||||
const unsigned int min, const unsigned int sec)
|
||||
{
|
||||
unsigned int mon = mon0, year = year0;
|
||||
unsigned int mon = mon0, year = year0;
|
||||
|
||||
/* 1..12 -> 11,12,1..10 */
|
||||
if (0 >= (int) (mon -= 2)) {
|
||||
mon += 12; /* Puts Feb last since it has leap day */
|
||||
year -= 1;
|
||||
}
|
||||
/* 1..12 -> 11,12,1..10 */
|
||||
if (0 >= (int) (mon -= 2)) {
|
||||
mon += 12; /* Puts Feb last since it has leap day */
|
||||
year -= 1;
|
||||
}
|
||||
|
||||
//int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
|
||||
// year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
|
||||
int64_t res;
|
||||
res = 367*((int64_t)mon)/12;
|
||||
res += year/4 - year/100 + year/400 + day + year*365 - 719499;
|
||||
int64_t res = 367*((int64_t)mon)/12;
|
||||
|
||||
res += ((int64_t)(year/4 - year/100 + year/400 + day + year*365) - 719499); // this value may be less than 0
|
||||
res = res*24;
|
||||
res = ((res + hour) * 60 + min) * 60 + sec;
|
||||
|
||||
return (res + timezone);
|
||||
return (res + timezone);
|
||||
}
|
||||
// ==== mktime() kernel code =================//
|
||||
static int64_t m_deltaUtc = 0;
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
|
||||
|
||||
import sys
|
||||
import os
|
||||
import traceback
|
||||
# Require Python 3
|
||||
if sys.version_info[0] < 3:
|
||||
|
@ -32,6 +33,7 @@ import textwrap
|
|||
|
||||
from typing import List
|
||||
from typing import Dict
|
||||
from typing import Set
|
||||
|
||||
from util.log import *
|
||||
from util.dnodes import *
|
||||
|
@ -42,7 +44,10 @@ import crash_gen
|
|||
import taos
|
||||
|
||||
# Global variables, tried to keep a small number.
|
||||
gConfig = None # Command-line/Environment Configurations, will set a bit later
|
||||
|
||||
# Command-line/Environment Configurations, will set a bit later
|
||||
# ConfigNameSpace = argparse.Namespace
|
||||
gConfig = argparse.Namespace() # Dummy value, will be replaced later
|
||||
logger = None
|
||||
|
||||
def runThread(wt: WorkerThread):
|
||||
|
@ -64,7 +69,7 @@ class WorkerThread:
|
|||
# self._curStep = -1
|
||||
self._pool = pool
|
||||
self._tid = tid
|
||||
self._tc = tc
|
||||
self._tc = tc # type: ThreadCoordinator
|
||||
# self.threadIdent = threading.get_ident()
|
||||
self._thread = threading.Thread(target=runThread, args=(self,))
|
||||
self._stepGate = threading.Event()
|
||||
|
@ -156,13 +161,13 @@ class WorkerThread:
|
|||
if ( gConfig.per_thread_db_connection ):
|
||||
return self._dbConn.execute(sql)
|
||||
else:
|
||||
return self._tc.getDbState().getDbConn().execute(sql)
|
||||
return self._tc.getDbManager().getDbConn().execute(sql)
|
||||
|
||||
def getDbConn(self):
|
||||
if ( gConfig.per_thread_db_connection ):
|
||||
return self._dbConn
|
||||
else:
|
||||
return self._tc.getDbState().getDbConn()
|
||||
return self._tc.getDbManager().getDbConn()
|
||||
|
||||
# def querySql(self, sql): # not "execute", since we are out side the DB context
|
||||
# if ( gConfig.per_thread_db_connection ):
|
||||
|
@ -171,12 +176,12 @@ class WorkerThread:
|
|||
# return self._tc.getDbState().getDbConn().query(sql)
|
||||
|
||||
class ThreadCoordinator:
|
||||
def __init__(self, pool, dbState):
|
||||
def __init__(self, pool, dbManager):
|
||||
self._curStep = -1 # first step is 0
|
||||
self._pool = pool
|
||||
# self._wd = wd
|
||||
self._te = None # prepare for every new step
|
||||
self._dbState = dbState
|
||||
self._dbManager = dbManager
|
||||
self._executedTasks: List[Task] = [] # in a given step
|
||||
self._lock = threading.RLock() # sync access for a few things
|
||||
|
||||
|
@ -186,8 +191,8 @@ class ThreadCoordinator:
|
|||
def getTaskExecutor(self):
|
||||
return self._te
|
||||
|
||||
def getDbState(self) -> DbState :
|
||||
return self._dbState
|
||||
def getDbManager(self) -> DbManager :
|
||||
return self._dbManager
|
||||
|
||||
def crossStepBarrier(self):
|
||||
self._stepBarrier.wait()
|
||||
|
@ -211,7 +216,7 @@ class ThreadCoordinator:
|
|||
|
||||
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
|
||||
try:
|
||||
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
|
||||
self._dbManager.getStateMachine().transition(self._executedTasks) # at end of step, transiton the DB state
|
||||
except taos.error.ProgrammingError as err:
|
||||
if ( err.msg == 'network unavailable' ): # broken DB connection
|
||||
logger.info("DB connection broken, execution failed")
|
||||
|
@ -284,8 +289,8 @@ class ThreadCoordinator:
|
|||
# logger.debug(" (dice:{}/{}) ".format(i, nTasks))
|
||||
# # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
|
||||
# return tasks[i].clone() # TODO: still necessary?
|
||||
taskType = self.getDbState().pickTaskType() # pick a task type for current state
|
||||
return taskType(self.getDbState(), self._execStats) # create a task from it
|
||||
taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state
|
||||
return taskType(self.getDbManager(), self._execStats) # create a task from it
|
||||
|
||||
def resetExecutedTasks(self):
|
||||
self._executedTasks = [] # should be under single thread
|
||||
|
@ -296,16 +301,12 @@ class ThreadCoordinator:
|
|||
|
||||
# We define a class to run a number of threads in locking steps.
|
||||
class ThreadPool:
|
||||
def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
|
||||
def __init__(self, numThreads, maxSteps):
|
||||
self.numThreads = numThreads
|
||||
self.maxSteps = maxSteps
|
||||
self.funcSequencer = funcSequencer
|
||||
# Internal class variables
|
||||
# self.dispatcher = WorkDispatcher(dbState) # Obsolete?
|
||||
self.curStep = 0
|
||||
self.threadList = []
|
||||
# self.stepGate = threading.Condition() # Gate to hold/sync all threads
|
||||
# self.numWaitingThreads = 0
|
||||
|
||||
# starting to run all the threads, in locking steps
|
||||
def createAndStartThreads(self, tc: ThreadCoordinator):
|
||||
|
@ -319,7 +320,8 @@ class ThreadPool:
|
|||
logger.debug("Joining thread...")
|
||||
workerThread._thread.join()
|
||||
|
||||
# A queue of continguous POSITIVE integers
|
||||
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
|
||||
# for new table names
|
||||
class LinearQueue():
|
||||
def __init__(self):
|
||||
self.firstIndex = 1 # 1st ever element
|
||||
|
@ -595,9 +597,9 @@ class StateEmpty(AnyState):
|
|||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( self.hasSuccess(tasks, CreateDbTask) ): # at EMPTY, if there's succes in creating DB
|
||||
if ( not self.hasTask(tasks, DropDbTask) ) : # and no drop_db tasks
|
||||
self.assertAtMostOneSuccess(tasks, CreateDbTask) # we must have at most one. TODO: compare numbers
|
||||
if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB
|
||||
if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks
|
||||
self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers
|
||||
|
||||
class StateDbOnly(AnyState):
|
||||
def getInfo(self):
|
||||
|
@ -609,20 +611,20 @@ class StateDbOnly(AnyState):
|
|||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( not self.hasTask(tasks, CreateDbTask) ):
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more
|
||||
self.assertIfExistThenSuccess(tasks, DropDbTask)
|
||||
if ( not self.hasTask(tasks, TaskCreateDb) ):
|
||||
self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more
|
||||
self.assertIfExistThenSuccess(tasks, TaskDropDb)
|
||||
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
|
||||
# Nothing to be said about adding data task
|
||||
if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
|
||||
# if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
|
||||
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask)
|
||||
# self.assertAtMostOneSuccess(tasks, DropDbTask)
|
||||
# self._state = self.STATE_EMPTY
|
||||
elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success
|
||||
if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
|
||||
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
|
||||
if ( not self.hasTask(tasks, DropFixedSuperTableTask) ):
|
||||
self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything
|
||||
self.assertNoTask(tasks, DropDbTask) # should have have tried
|
||||
if ( not self.hasTask(tasks, TaskDropSuperTable) ):
|
||||
self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
|
||||
# self.assertNoTask(tasks, DropDbTask) # should have have tried
|
||||
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
|
||||
# # can't say there's add-data attempts, since they may all fail
|
||||
# self._state = self.STATE_TABLE_ONLY
|
||||
|
@ -645,8 +647,8 @@ class StateSuperTableOnly(AnyState):
|
|||
]
|
||||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( self.hasSuccess(tasks, DropFixedSuperTableTask) ): # we are able to drop the table
|
||||
self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask)
|
||||
if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
|
||||
self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
|
||||
# self._state = self.STATE_DB_ONLY
|
||||
# elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
|
||||
# self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
|
||||
|
@ -670,35 +672,140 @@ class StateHasData(AnyState):
|
|||
|
||||
def verifyTasksToState(self, tasks, newState):
|
||||
if ( newState.equals(AnyState.STATE_EMPTY) ):
|
||||
self.hasSuccess(tasks, DropDbTask)
|
||||
self.assertAtMostOneSuccess(tasks, DropDbTask) # TODO: dicy
|
||||
self.hasSuccess(tasks, TaskDropDb)
|
||||
if ( not self.hasTask(tasks, TaskCreateDb) ) :
|
||||
self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy
|
||||
elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
|
||||
if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task
|
||||
self.assertNoTask(tasks, DropDbTask) # we must have drop_db task
|
||||
self.hasSuccess(tasks, DropFixedSuperTableTask)
|
||||
if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task
|
||||
self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
|
||||
self.hasSuccess(tasks, TaskDropSuperTable)
|
||||
# self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
|
||||
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
|
||||
self.assertNoTask(tasks, DropDbTask)
|
||||
self.assertNoTask(tasks, DropFixedSuperTableTask)
|
||||
self.assertNoTask(tasks, AddFixedDataTask)
|
||||
self.assertNoTask(tasks, TaskDropDb)
|
||||
self.assertNoTask(tasks, TaskDropSuperTable)
|
||||
self.assertNoTask(tasks, TaskAddData)
|
||||
# self.hasSuccess(tasks, DeleteDataTasks)
|
||||
else:
|
||||
self.assertNoTask(tasks, DropDbTask)
|
||||
self.assertNoTask(tasks, DropFixedSuperTableTask)
|
||||
self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
|
||||
else: # should be STATE_HAS_DATA
|
||||
if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one
|
||||
self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it
|
||||
if (not self.hasTask(tasks, TaskCreateSuperTable)) : # if we didn't create the table
|
||||
self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it
|
||||
# self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
|
||||
|
||||
class StateMechine :
|
||||
def __init__(self, dbConn):
|
||||
self._dbConn = dbConn
|
||||
self._curState = self._findCurrentState() # starting state
|
||||
self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
|
||||
|
||||
def getCurrentState(self):
|
||||
return self._curState
|
||||
|
||||
# State of the database as we believe it to be
|
||||
class DbState():
|
||||
|
||||
# May be slow, use cautionsly...
|
||||
def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
|
||||
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
|
||||
firstTaskTypes = []
|
||||
for tc in allTaskClasses:
|
||||
# t = tc(self) # create task object
|
||||
if tc.canBeginFrom(self._curState):
|
||||
firstTaskTypes.append(tc)
|
||||
# now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
|
||||
taskTypes = firstTaskTypes.copy() # have to have these
|
||||
for task1 in firstTaskTypes: # each task type gathered so far
|
||||
endState = task1.getEndState() # figure the end state
|
||||
if endState == None: # does not change end state
|
||||
continue # no use, do nothing
|
||||
for tc in allTaskClasses: # what task can further begin from there?
|
||||
if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
|
||||
taskTypes.append(tc) # gather it
|
||||
|
||||
if len(taskTypes) <= 0:
|
||||
raise RuntimeError("No suitable task types found for state: {}".format(self._curState))
|
||||
logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, taskTypes))
|
||||
return taskTypes
|
||||
|
||||
def _findCurrentState(self):
|
||||
dbc = self._dbConn
|
||||
ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
|
||||
if dbc.query("show databases") == 0 : # no database?!
|
||||
# logger.debug("Found EMPTY state")
|
||||
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
|
||||
return StateEmpty()
|
||||
dbc.execute("use db") # did not do this when openning connection
|
||||
if dbc.query("show tables") == 0 : # no tables
|
||||
# logger.debug("Found DB ONLY state")
|
||||
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
|
||||
return StateDbOnly()
|
||||
if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables
|
||||
# logger.debug("Found TABLE_ONLY state")
|
||||
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
|
||||
return StateSuperTableOnly()
|
||||
else: # has actual tables
|
||||
# logger.debug("Found HAS_DATA state")
|
||||
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
|
||||
return StateHasData()
|
||||
|
||||
def transition(self, tasks):
|
||||
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
|
||||
return # do nothing
|
||||
|
||||
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
|
||||
|
||||
# Generic Checks, first based on the start state
|
||||
if self._curState.canCreateDb():
|
||||
self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
|
||||
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
|
||||
|
||||
if self._curState.canDropDb():
|
||||
self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
|
||||
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
|
||||
|
||||
# if self._state.canCreateFixedTable():
|
||||
# self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
|
||||
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
|
||||
|
||||
# if self._state.canDropFixedTable():
|
||||
# self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
|
||||
# self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
|
||||
|
||||
# if self._state.canAddData():
|
||||
# self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually
|
||||
|
||||
# if self._state.canReadData():
|
||||
# Nothing for sure
|
||||
|
||||
newState = self._findCurrentState()
|
||||
logger.debug("[STT] New DB state determined: {}".format(newState))
|
||||
self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
|
||||
self._curState = newState
|
||||
|
||||
def pickTaskType(self):
|
||||
taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state
|
||||
weights = []
|
||||
for tt in taskTypes:
|
||||
endState = tt.getEndState()
|
||||
if endState != None :
|
||||
weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
|
||||
else:
|
||||
weights.append(10) # read data task, default to 10: TODO: change to a constant
|
||||
i = self._weighted_choice_sub(weights)
|
||||
# logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
|
||||
return taskTypes[i]
|
||||
|
||||
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
||||
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
|
||||
for i, w in enumerate(weights):
|
||||
rnd -= w
|
||||
if rnd < 0:
|
||||
return i
|
||||
|
||||
# Manager of the Database Data/Connection
|
||||
class DbManager():
|
||||
def __init__(self, resetDb = True):
|
||||
self.tableNumQueue = LinearQueue()
|
||||
self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick
|
||||
self._lastInt = 0 # next one is initial integer
|
||||
self._lock = threading.RLock()
|
||||
|
||||
self._state = StateInvalid() # starting state
|
||||
self._stateWeights = [1,3,5,10] # indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
|
||||
|
||||
# self.openDbServerConnection()
|
||||
self._dbConn = DbConn()
|
||||
|
@ -706,7 +813,7 @@ class DbState():
|
|||
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
|
||||
except taos.error.ProgrammingError as err:
|
||||
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
|
||||
if ( err.msg == 'disconnected' ): # cannot open DB connection
|
||||
if ( err.msg == 'client disconnected' ): # cannot open DB connection
|
||||
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
|
||||
sys.exit()
|
||||
else:
|
||||
|
@ -717,13 +824,17 @@ class DbState():
|
|||
|
||||
if resetDb :
|
||||
self._dbConn.resetDb() # drop and recreate DB
|
||||
self._state = self._findCurrentState()
|
||||
|
||||
self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape
|
||||
|
||||
def getDbConn(self):
|
||||
return self._dbConn
|
||||
|
||||
def getState(self):
|
||||
return self._state
|
||||
def getStateMachine(self):
|
||||
return self._stateMachine
|
||||
|
||||
# def getState(self):
|
||||
# return self._stateMachine.getCurrentState()
|
||||
|
||||
# We aim to create a starting time tick, such that, whenever we run our test here once
|
||||
# We should be able to safely create 100,000 records, which will not have any repeated time stamp
|
||||
|
@ -750,7 +861,8 @@ class DbState():
|
|||
tIndex = self.tableNumQueue.push()
|
||||
return tIndex
|
||||
|
||||
def getFixedSuperTableName(self):
|
||||
@classmethod
|
||||
def getFixedSuperTableName(cls):
|
||||
return "fs_table"
|
||||
|
||||
def releaseTable(self, i): # return the table back, so others can use it
|
||||
|
@ -782,122 +894,6 @@ class DbState():
|
|||
def cleanUp(self):
|
||||
self._dbConn.close()
|
||||
|
||||
# May be slow, use cautionsly...
|
||||
def getTaskTypesAtState(self):
|
||||
allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
|
||||
firstTaskTypes = []
|
||||
for tc in allTaskClasses:
|
||||
# t = tc(self) # create task object
|
||||
if tc.canBeginFrom(self._state):
|
||||
firstTaskTypes.append(tc)
|
||||
# now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
|
||||
taskTypes = firstTaskTypes.copy() # have to have these
|
||||
for task1 in firstTaskTypes: # each task type gathered so far
|
||||
endState = task1.getEndState() # figure the end state
|
||||
if endState == None:
|
||||
continue
|
||||
for tc in allTaskClasses: # what task can further begin from there?
|
||||
if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
|
||||
taskTypes.append(tc) # gather it
|
||||
|
||||
if len(taskTypes) <= 0:
|
||||
raise RuntimeError("No suitable task types found for state: {}".format(self._state))
|
||||
logger.debug("[OPS] Tasks found for state {}: {}".format(self._state, taskTypes))
|
||||
return taskTypes
|
||||
|
||||
# tasks.append(ReadFixedDataTask(self)) # always for everybody
|
||||
# if ( self._state == self.STATE_EMPTY ):
|
||||
# tasks.append(CreateDbTask(self))
|
||||
# tasks.append(CreateFixedTableTask(self))
|
||||
# elif ( self._state == self.STATE_DB_ONLY ):
|
||||
# tasks.append(DropDbTask(self))
|
||||
# tasks.append(CreateFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# elif ( self._state == self.STATE_TABLE_ONLY ):
|
||||
# tasks.append(DropFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
|
||||
# tasks.append(DropFixedTableTask(self))
|
||||
# tasks.append(AddFixedDataTask(self))
|
||||
# else:
|
||||
# raise RuntimeError("Unexpected DbState state: {}".format(self._state))
|
||||
# return tasks
|
||||
|
||||
def pickTaskType(self):
|
||||
taskTypes = self.getTaskTypesAtState() # all the task types we can choose from at curent state
|
||||
weights = []
|
||||
for tt in taskTypes:
|
||||
endState = tt.getEndState()
|
||||
if endState != None :
|
||||
weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
|
||||
else:
|
||||
weights.append(10) # read data task, default to 10: TODO: change to a constant
|
||||
i = self._weighted_choice_sub(weights)
|
||||
# logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
|
||||
return taskTypes[i]
|
||||
|
||||
def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
|
||||
rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
|
||||
for i, w in enumerate(weights):
|
||||
rnd -= w
|
||||
if rnd < 0:
|
||||
return i
|
||||
|
||||
def _findCurrentState(self):
|
||||
dbc = self._dbConn
|
||||
ts = time.time()
|
||||
if dbc.query("show databases") == 0 : # no database?!
|
||||
# logger.debug("Found EMPTY state")
|
||||
logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
|
||||
return StateEmpty()
|
||||
dbc.execute("use db") # did not do this when openning connection
|
||||
if dbc.query("show tables") == 0 : # no tables
|
||||
# logger.debug("Found DB ONLY state")
|
||||
logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
|
||||
return StateDbOnly()
|
||||
if dbc.query("SELECT * FROM db.{}".format(self.getFixedSuperTableName()) ) == 0 : # no data
|
||||
# logger.debug("Found TABLE_ONLY state")
|
||||
logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
|
||||
return StateSuperTableOnly()
|
||||
else:
|
||||
# logger.debug("Found HAS_DATA state")
|
||||
logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
|
||||
return StateHasData()
|
||||
|
||||
def transition(self, tasks):
|
||||
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
|
||||
return # do nothing
|
||||
|
||||
self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps
|
||||
|
||||
# Generic Checks, first based on the start state
|
||||
if self._state.canCreateDb():
|
||||
self._state.assertIfExistThenSuccess(tasks, CreateDbTask)
|
||||
# self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops
|
||||
|
||||
if self._state.canDropDb():
|
||||
self._state.assertIfExistThenSuccess(tasks, DropDbTask)
|
||||
# self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop
|
||||
|
||||
# if self._state.canCreateFixedTable():
|
||||
# self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
|
||||
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create
|
||||
|
||||
# if self._state.canDropFixedTable():
|
||||
# self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
|
||||
# self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop
|
||||
|
||||
# if self._state.canAddData():
|
||||
# self.assertIfExistThenSuccess(tasks, AddFixedDataTask) # not true actually
|
||||
|
||||
# if self._state.canReadData():
|
||||
# Nothing for sure
|
||||
|
||||
newState = self._findCurrentState()
|
||||
logger.debug("[STT] New DB state determined: {}".format(newState))
|
||||
self._state.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
|
||||
self._state = newState
|
||||
|
||||
class TaskExecutor():
|
||||
def __init__(self, curStep):
|
||||
self._curStep = curStep
|
||||
|
@ -923,8 +919,8 @@ class Task():
|
|||
# logger.debug("Allocating taskSN: {}".format(Task.taskSn))
|
||||
return Task.taskSn
|
||||
|
||||
def __init__(self, dbState: DbState, execStats: ExecutionStats):
|
||||
self._dbState = dbState
|
||||
def __init__(self, dbManager: DbManager, execStats: ExecutionStats):
|
||||
self._dbManager = dbManager
|
||||
self._workerThread = None
|
||||
self._err = None
|
||||
self._curStep = None
|
||||
|
@ -940,7 +936,7 @@ class Task():
|
|||
return self._err == None
|
||||
|
||||
def clone(self): # TODO: why do we need this again?
|
||||
newTask = self.__class__(self._dbState, self._execStats)
|
||||
newTask = self.__class__(self._dbManager, self._execStats)
|
||||
return newTask
|
||||
|
||||
def logDebug(self, msg):
|
||||
|
@ -966,7 +962,7 @@ class Task():
|
|||
self._executeInternal(te, wt) # TODO: no return value?
|
||||
except taos.error.ProgrammingError as err:
|
||||
self.logDebug("[=] Taos library exception: errno={:X}, msg: {}".format(err.errno, err))
|
||||
self._err = err
|
||||
self._err = err
|
||||
except:
|
||||
self.logDebug("[=] Unexpected exception")
|
||||
raise
|
||||
|
@ -976,7 +972,7 @@ class Task():
|
|||
self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
|
||||
|
||||
def execSql(self, sql):
|
||||
return self._dbState.execute(sql)
|
||||
return self._dbManager.execute(sql)
|
||||
|
||||
|
||||
class ExecutionStats:
|
||||
|
@ -1043,20 +1039,22 @@ class ExecutionStats:
|
|||
|
||||
|
||||
class StateTransitionTask(Task):
|
||||
# @classmethod
|
||||
# def getAllTaskClasses(cls): # static
|
||||
# return cls.__subclasses__()
|
||||
@classmethod
|
||||
def getInfo(cls): # each sub class should supply their own information
|
||||
raise RuntimeError("Overriding method expected")
|
||||
|
||||
_endState = None
|
||||
@classmethod
|
||||
def getEndState(cls): # TODO: optimize by calling it fewer times
|
||||
raise RuntimeError("Overriding method expected")
|
||||
|
||||
# @classmethod
|
||||
# def getBeginStates(cls):
|
||||
# return cls.getInfo()[0]
|
||||
|
||||
@classmethod
|
||||
def getEndState(cls): # returning the class name
|
||||
return cls.getInfo()[0]
|
||||
# @classmethod
|
||||
# def getEndState(cls): # returning the class name
|
||||
# return cls.getInfo()[0]
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
|
@ -1066,15 +1064,10 @@ class StateTransitionTask(Task):
|
|||
def execute(self, wt: WorkerThread):
|
||||
super().execute(wt)
|
||||
|
||||
|
||||
|
||||
class CreateDbTask(StateTransitionTask):
|
||||
class TaskCreateDb(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
# [AnyState.STATE_EMPTY], # can begin from
|
||||
StateDbOnly() # end state
|
||||
]
|
||||
def getEndState(cls):
|
||||
return StateDbOnly()
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
|
@ -1083,13 +1076,10 @@ class CreateDbTask(StateTransitionTask):
|
|||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
wt.execSql("create database db")
|
||||
|
||||
class DropDbTask(StateTransitionTask):
|
||||
class TaskDropDb(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
# [AnyState.STATE_DB_ONLY, AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
StateEmpty()
|
||||
]
|
||||
def getEndState(cls):
|
||||
return StateEmpty()
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
|
@ -1099,122 +1089,140 @@ class DropDbTask(StateTransitionTask):
|
|||
wt.execSql("drop database db")
|
||||
logger.debug("[OPS] database dropped at {}".format(time.time()))
|
||||
|
||||
class CreateFixedSuperTableTask(StateTransitionTask):
|
||||
class TaskCreateSuperTable(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
# [AnyState.STATE_DB_ONLY],
|
||||
StateSuperTableOnly()
|
||||
]
|
||||
def getEndState(cls):
|
||||
return StateSuperTableOnly()
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canCreateFixedSuperTable()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tblName = self._dbState.getFixedSuperTableName()
|
||||
tblName = self._dbManager.getFixedSuperTableName()
|
||||
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
|
||||
# No need to create the regular tables, INSERT will do that automatically
|
||||
|
||||
|
||||
class ReadFixedDataTask(StateTransitionTask):
|
||||
class TaskReadData(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
None # meaning doesn't affect state
|
||||
]
|
||||
def getEndState(cls):
|
||||
return None # meaning doesn't affect state
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canReadData()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
sTbName = self._dbState.getFixedSuperTableName()
|
||||
sTbName = self._dbManager.getFixedSuperTableName()
|
||||
dbc = wt.getDbConn()
|
||||
dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
|
||||
rTables = dbc.getQueryResult()
|
||||
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
|
||||
for rTbName in rTables : # regular tables
|
||||
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
|
||||
if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
|
||||
dbc.close()
|
||||
dbc.open()
|
||||
else:
|
||||
rTables = dbc.getQueryResult()
|
||||
# print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
|
||||
for rTbName in rTables : # regular tables
|
||||
dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
|
||||
|
||||
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
|
||||
|
||||
class DropFixedSuperTableTask(StateTransitionTask):
|
||||
class TaskDropSuperTable(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
StateDbOnly() # meaning doesn't affect state
|
||||
]
|
||||
def getEndState(cls):
|
||||
return StateDbOnly()
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canDropFixedSuperTable()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tblName = self._dbState.getFixedSuperTableName()
|
||||
tblName = self._dbManager.getFixedSuperTableName()
|
||||
wt.execSql("drop table db.{}".format(tblName))
|
||||
|
||||
class AddFixedDataTask(StateTransitionTask):
|
||||
class TaskAlterTags(StateTransitionTask):
|
||||
@classmethod
|
||||
def getInfo(cls):
|
||||
return [
|
||||
# [AnyState.STATE_TABLE_ONLY, AnyState.STATE_HAS_DATA],
|
||||
StateHasData()
|
||||
]
|
||||
def getEndState(cls):
|
||||
return None # meaning doesn't affect state
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canDropFixedSuperTable() # if we can drop it, we can alter tags
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tblName = self._dbManager.getFixedSuperTableName()
|
||||
dice = Dice.throw(4)
|
||||
if dice == 0 :
|
||||
wt.execSql("alter table db.{} add tag extraTag int".format(tblName))
|
||||
elif dice == 1 :
|
||||
wt.execSql("alter table db.{} drop tag extraTag".format(tblName))
|
||||
elif dice == 2 :
|
||||
wt.execSql("alter table db.{} drop tag newTag".format(tblName))
|
||||
else: # dice == 3
|
||||
wt.execSql("alter table db.{} change tag extraTag newTag".format(tblName))
|
||||
|
||||
class TaskAddData(StateTransitionTask):
|
||||
activeTable : Set[int] = set() # Track which table is being actively worked on
|
||||
LARGE_NUMBER_OF_TABLES = 35
|
||||
SMALL_NUMBER_OF_TABLES = 3
|
||||
LARGE_NUMBER_OF_RECORDS = 50
|
||||
SMALL_NUMBER_OF_RECORDS = 3
|
||||
|
||||
# We use these two files to record operations to DB, useful for power-off tests
|
||||
fAddLogReady = None
|
||||
fAddLogDone = None
|
||||
|
||||
@classmethod
|
||||
def prepToRecordOps(cls):
|
||||
if gConfig.record_ops :
|
||||
if ( cls.fAddLogReady == None ):
|
||||
logger.info("Recording in a file operations to be performed...")
|
||||
cls.fAddLogReady = open("add_log_ready.txt", "w")
|
||||
if ( cls.fAddLogDone == None ):
|
||||
logger.info("Recording in a file operations completed...")
|
||||
cls.fAddLogDone = open("add_log_done.txt", "w")
|
||||
|
||||
@classmethod
|
||||
def getEndState(cls):
|
||||
return StateHasData()
|
||||
|
||||
@classmethod
|
||||
def canBeginFrom(cls, state: AnyState):
|
||||
return state.canAddData()
|
||||
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
ds = self._dbState
|
||||
ds = self._dbManager
|
||||
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
|
||||
for i in range(10): # 0 to 9
|
||||
for j in range(10) :
|
||||
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
|
||||
i,
|
||||
tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
|
||||
random.shuffle(tblSeq)
|
||||
for i in tblSeq:
|
||||
if ( i in self.activeTable ): # wow already active
|
||||
# logger.info("Concurrent data insertion into table: {}".format(i))
|
||||
# print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table
|
||||
print("x", end="", flush=True)
|
||||
else:
|
||||
self.activeTable.add(i) # marking it active
|
||||
# No need to shuffle data sequence, unless later we decide to do non-increment insertion
|
||||
for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
|
||||
nextInt = ds.getNextInt()
|
||||
regTableName = "db.reg_table_{}".format(i)
|
||||
if gConfig.record_ops:
|
||||
self.prepToRecordOps()
|
||||
self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogReady.flush()
|
||||
os.fsync(self.fAddLogReady)
|
||||
sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format(
|
||||
regTableName,
|
||||
ds.getFixedSuperTableName(),
|
||||
ds.getNextBinary(), ds.getNextFloat(),
|
||||
ds.getNextTick(), ds.getNextInt())
|
||||
ds.getNextTick(), nextInt)
|
||||
wt.execSql(sql)
|
||||
|
||||
|
||||
#---------- Non State-Transition Related Tasks ----------#
|
||||
|
||||
class CreateTableTask(Task):
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tIndex = self._dbState.addTable()
|
||||
self.logDebug("Creating a table {} ...".format(tIndex))
|
||||
wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
|
||||
self.logDebug("Table {} created.".format(tIndex))
|
||||
self._dbState.releaseTable(tIndex)
|
||||
|
||||
class DropTableTask(Task):
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
tableName = self._dbState.getTableNameToDelete()
|
||||
if ( not tableName ): # May be "False"
|
||||
self.logInfo("Cannot generate a table to delete, skipping...")
|
||||
return
|
||||
self.logInfo("Dropping a table db.{} ...".format(tableName))
|
||||
wt.execSql("drop table db.{}".format(tableName))
|
||||
|
||||
|
||||
|
||||
class AddDataTask(Task):
|
||||
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
|
||||
ds = self._dbState
|
||||
self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
|
||||
tIndex = ds.pickAndAllocateTable()
|
||||
if ( tIndex == None ):
|
||||
self.logInfo("No table found to add data, skipping...")
|
||||
return
|
||||
sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
|
||||
self.logDebug("[SQL] Executing SQL: {}".format(sql))
|
||||
wt.execSql(sql)
|
||||
ds.releaseTable(tIndex)
|
||||
self.logDebug("[OPS] Finished adding data")
|
||||
if gConfig.record_ops:
|
||||
self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
|
||||
self.fAddLogDone.flush()
|
||||
os.fsync(self.fAddLogDone)
|
||||
self.activeTable.discard(i) # not raising an error, unlike remove
|
||||
|
||||
|
||||
# Deterministic random number generator
|
||||
|
@ -1288,7 +1296,67 @@ class LoggingFilter(logging.Filter):
|
|||
# return False
|
||||
return True
|
||||
|
||||
class MainExec:
|
||||
@classmethod
|
||||
def runClient(cls):
|
||||
# resetDb = False # DEBUG only
|
||||
# dbState = DbState(resetDb) # DBEUG only!
|
||||
dbManager = DbManager() # Regular function
|
||||
Dice.seed(0) # initial seeding of dice
|
||||
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
|
||||
tc = ThreadCoordinator(thPool, dbManager)
|
||||
|
||||
tc.run()
|
||||
tc.logStats()
|
||||
dbManager.cleanUp()
|
||||
|
||||
@classmethod
|
||||
def runService(cls):
|
||||
print("Running service...")
|
||||
|
||||
@classmethod
|
||||
def runTemp(cls): # for debugging purposes
|
||||
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
|
||||
# dbc = dbState.getDbConn()
|
||||
# sTbName = dbState.getFixedSuperTableName()
|
||||
# dbc.execute("create database if not exists db")
|
||||
# if not dbState.getState().equals(StateEmpty()):
|
||||
# dbc.execute("use db")
|
||||
|
||||
# rTables = None
|
||||
# try: # the super table may not exist
|
||||
# sql = "select TBNAME from db.{}".format(sTbName)
|
||||
# logger.info("Finding out tables in super table: {}".format(sql))
|
||||
# dbc.query(sql) # TODO: analyze result set later
|
||||
# logger.info("Fetching result")
|
||||
# rTables = dbc.getQueryResult()
|
||||
# logger.info("Result: {}".format(rTables))
|
||||
# except taos.error.ProgrammingError as err:
|
||||
# logger.info("Initial Super table OPS error: {}".format(err))
|
||||
|
||||
# # sys.exit()
|
||||
# if ( not rTables == None):
|
||||
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
|
||||
# try:
|
||||
# for rTbName in rTables : # regular tables
|
||||
# ds = dbState
|
||||
# logger.info("Inserting into table: {}".format(rTbName[0]))
|
||||
# sql = "insert into db.{} values ('{}', {});".format(
|
||||
# rTbName[0],
|
||||
# ds.getNextTick(), ds.getNextInt())
|
||||
# dbc.execute(sql)
|
||||
# for rTbName in rTables : # regular tables
|
||||
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
|
||||
# logger.info("Initial READING operation is successful")
|
||||
# except taos.error.ProgrammingError as err:
|
||||
# logger.info("Initial WRITE/READ error: {}".format(err))
|
||||
|
||||
# Sandbox testing code
|
||||
# dbc = dbState.getDbConn()
|
||||
# while True:
|
||||
# rows = dbc.query("show databases")
|
||||
# print("Rows: {}, time={}".format(rows, time.time()))
|
||||
return
|
||||
|
||||
def main():
|
||||
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html
|
||||
|
@ -1301,20 +1369,27 @@ def main():
|
|||
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
|
||||
|
||||
'''))
|
||||
parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
|
||||
help='Use a single shared db connection (default: false)')
|
||||
|
||||
parser.add_argument('-d', '--debug', action='store_true',
|
||||
help='Turn on DEBUG mode for more logging (default: false)')
|
||||
parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
|
||||
parser.add_argument('-e', '--run-tdengine', action='store_true',
|
||||
help='Run TDengine service in foreground (default: false)')
|
||||
parser.add_argument('-l', '--larger-data', action='store_true',
|
||||
help='Write larger amount of data during write operations (default: false)')
|
||||
parser.add_argument('-p', '--per-thread-db-connection', action='store_false',
|
||||
help='Use a single shared db connection (default: false)')
|
||||
parser.add_argument('-r', '--record-ops', action='store_true',
|
||||
help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
|
||||
parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
|
||||
help='Maximum number of steps to run (default: 100)')
|
||||
parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
|
||||
parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
|
||||
help='Number of threads to run (default: 10)')
|
||||
|
||||
global gConfig
|
||||
gConfig = parser.parse_args()
|
||||
if len(sys.argv) == 1:
|
||||
parser.print_help()
|
||||
sys.exit()
|
||||
# if len(sys.argv) == 1:
|
||||
# parser.print_help()
|
||||
# sys.exit()
|
||||
|
||||
global logger
|
||||
logger = logging.getLogger('CrashGen')
|
||||
|
@ -1326,62 +1401,11 @@ def main():
|
|||
ch = logging.StreamHandler()
|
||||
logger.addHandler(ch)
|
||||
|
||||
# resetDb = False # DEBUG only
|
||||
# dbState = DbState(resetDb) # DBEUG only!
|
||||
dbState = DbState() # Regular function
|
||||
Dice.seed(0) # initial seeding of dice
|
||||
tc = ThreadCoordinator(
|
||||
ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0),
|
||||
# WorkDispatcher(dbState), # Obsolete?
|
||||
dbState
|
||||
)
|
||||
if gConfig.run_tdengine : # run server
|
||||
MainExec.runService()
|
||||
else :
|
||||
MainExec.runClient()
|
||||
|
||||
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
|
||||
# dbc = dbState.getDbConn()
|
||||
# sTbName = dbState.getFixedSuperTableName()
|
||||
# dbc.execute("create database if not exists db")
|
||||
# if not dbState.getState().equals(StateEmpty()):
|
||||
# dbc.execute("use db")
|
||||
|
||||
# rTables = None
|
||||
# try: # the super table may not exist
|
||||
# sql = "select TBNAME from db.{}".format(sTbName)
|
||||
# logger.info("Finding out tables in super table: {}".format(sql))
|
||||
# dbc.query(sql) # TODO: analyze result set later
|
||||
# logger.info("Fetching result")
|
||||
# rTables = dbc.getQueryResult()
|
||||
# logger.info("Result: {}".format(rTables))
|
||||
# except taos.error.ProgrammingError as err:
|
||||
# logger.info("Initial Super table OPS error: {}".format(err))
|
||||
|
||||
# # sys.exit()
|
||||
# if ( not rTables == None):
|
||||
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
|
||||
# try:
|
||||
# for rTbName in rTables : # regular tables
|
||||
# ds = dbState
|
||||
# logger.info("Inserting into table: {}".format(rTbName[0]))
|
||||
# sql = "insert into db.{} values ('{}', {});".format(
|
||||
# rTbName[0],
|
||||
# ds.getNextTick(), ds.getNextInt())
|
||||
# dbc.execute(sql)
|
||||
# for rTbName in rTables : # regular tables
|
||||
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
|
||||
# logger.info("Initial READING operation is successful")
|
||||
# except taos.error.ProgrammingError as err:
|
||||
# logger.info("Initial WRITE/READ error: {}".format(err))
|
||||
|
||||
|
||||
|
||||
# Sandbox testing code
|
||||
# dbc = dbState.getDbConn()
|
||||
# while True:
|
||||
# rows = dbc.query("show databases")
|
||||
# print("Rows: {}, time={}".format(rows, time.time()))
|
||||
|
||||
tc.run()
|
||||
tc.logStats()
|
||||
dbState.cleanUp()
|
||||
|
||||
# logger.info("Crash_Gen execution finished")
|
||||
|
||||
|
|
|
@ -153,13 +153,13 @@ print $rows $data00 $data10 $data20
|
|||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != tb3 then
|
||||
if $data00 != tb1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != tb2 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != tb1 then
|
||||
if $data20 != tb3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -221,13 +221,13 @@ sql show tables
|
|||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != tb3 then
|
||||
if $data00 != tb1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != tb2 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != tb1 then
|
||||
if $data20 != tb3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -80,6 +80,7 @@ print $rows
|
|||
|
||||
sql select ts from group_mt0 where ts>='1970-1-1 8:1:43' and ts<='1970-1-1 8:1:43.500' limit 8000 offset 0;
|
||||
if $rows != 4008 then
|
||||
print expect 4008, actual:$rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -7,15 +7,21 @@ INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
|||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
|
||||
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||
add_executable(insertPerTable insertPerTable.c)
|
||||
target_link_libraries(insertPerTable taos_static pthread)
|
||||
#add_executable(insertPerTable insertPerTable.c)
|
||||
#target_link_libraries(insertPerTable taos_static pthread)
|
||||
|
||||
add_executable(insertPerRow insertPerRow.c)
|
||||
target_link_libraries(insertPerRow taos_static pthread)
|
||||
#add_executable(insertPerRow insertPerRow.c)
|
||||
#target_link_libraries(insertPerRow taos_static pthread)
|
||||
|
||||
add_executable(importOneRow importOneRow.c)
|
||||
target_link_libraries(importOneRow taos_static pthread)
|
||||
#add_executable(importOneRow importOneRow.c)
|
||||
#target_link_libraries(importOneRow taos_static pthread)
|
||||
|
||||
add_executable(importPerTable importPerTable.c)
|
||||
target_link_libraries(importPerTable taos_static pthread)
|
||||
#add_executable(importPerTable importPerTable.c)
|
||||
#target_link_libraries(importPerTable taos_static pthread)
|
||||
|
||||
#add_executable(hashPerformance hashPerformance.c)
|
||||
#target_link_libraries(hashPerformance taos_static tutil common pthread)
|
||||
|
||||
add_executable(createTablePerformance createTablePerformance.c)
|
||||
target_link_libraries(createTablePerformance taos_static tutil common pthread)
|
||||
ENDIF()
|
||||
|
|
|
@ -0,0 +1,232 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taos.h"
|
||||
#include "tulog.h"
|
||||
#include "ttime.h"
|
||||
#include "tutil.h"
|
||||
#include "tglobal.h"
|
||||
#include "hash.h"
|
||||
|
||||
#define MAX_RANDOM_POINTS 20000
|
||||
#define GREEN "\033[1;32m"
|
||||
#define NC "\033[0m"
|
||||
|
||||
char dbName[32] = "db";
|
||||
char stableName[64] = "st";
|
||||
int32_t numOfThreads = 30;
|
||||
int32_t numOfTables = 100000;
|
||||
int32_t maxTables = 5000;
|
||||
int32_t numOfColumns = 2;
|
||||
|
||||
typedef struct {
|
||||
int32_t tableBeginIndex;
|
||||
int32_t tableEndIndex;
|
||||
int32_t threadIndex;
|
||||
char dbName[32];
|
||||
char stableName[64];
|
||||
float createTableSpeed;
|
||||
pthread_t thread;
|
||||
} SThreadInfo;
|
||||
|
||||
void shellParseArgument(int argc, char *argv[]);
|
||||
void *threadFunc(void *param);
|
||||
void createDbAndSTable();
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
shellParseArgument(argc, argv);
|
||||
taos_init();
|
||||
createDbAndSTable();
|
||||
|
||||
pPrint("%d threads are spawned to create table", numOfThreads);
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo));
|
||||
|
||||
int32_t numOfTablesPerThread = numOfTables / numOfThreads;
|
||||
numOfTables = numOfTablesPerThread * numOfThreads;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
pInfo[i].tableBeginIndex = i * numOfTablesPerThread;
|
||||
pInfo[i].tableEndIndex = (i + 1) * numOfTablesPerThread;
|
||||
pInfo[i].threadIndex = i;
|
||||
strcpy(pInfo[i].dbName, dbName);
|
||||
strcpy(pInfo[i].stableName, stableName);
|
||||
pthread_create(&(pInfo[i].thread), &thattr, threadFunc, (void *)(pInfo + i));
|
||||
}
|
||||
|
||||
taosMsleep(300);
|
||||
for (int i = 0; i < numOfThreads; i++) {
|
||||
pthread_join(pInfo[i].thread, NULL);
|
||||
}
|
||||
|
||||
float createTableSpeed = 0;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
createTableSpeed += pInfo[i].createTableSpeed;
|
||||
}
|
||||
|
||||
pPrint("%s total speed:%.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC);
|
||||
|
||||
pthread_attr_destroy(&thattr);
|
||||
free(pInfo);
|
||||
}
|
||||
|
||||
void createDbAndSTable() {
|
||||
pPrint("start to create db and stable");
|
||||
char qstr[64000];
|
||||
|
||||
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (con == NULL) {
|
||||
pError("failed to connect to DB, reason:%s", taos_errstr(con));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
sprintf(qstr, "create database if not exists %s maxtables %d", dbName, maxTables);
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create database:%s, sql:%s, code:%d reason:%s", dbName, qstr, taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
|
||||
sprintf(qstr, "use %s", dbName);
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to use db, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
|
||||
int len = sprintf(qstr, "create table if not exists %s(ts timestamp", stableName);
|
||||
for (int32_t f = 0; f < numOfColumns - 1; ++f) {
|
||||
len += sprintf(qstr + len, ", f%d double", f);
|
||||
}
|
||||
sprintf(qstr + len, ") tags(t int)");
|
||||
|
||||
pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create stable, code:%d reason:%s", taos_errno(con), taos_errstr(con));
|
||||
exit(0);
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
|
||||
taos_close(con);
|
||||
}
|
||||
|
||||
void *threadFunc(void *param) {
|
||||
SThreadInfo *pInfo = (SThreadInfo *)param;
|
||||
char qstr[65000];
|
||||
int code;
|
||||
|
||||
TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (con == NULL) {
|
||||
pError("index:%d, failed to connect to DB, reason:%s", pInfo->threadIndex, taos_errstr(con));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
sprintf(qstr, "use %s", pInfo->dbName);
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
taos_free_result(pSql);
|
||||
|
||||
int64_t startMs = taosGetTimestampMs();
|
||||
|
||||
for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) {
|
||||
sprintf(qstr, "create table if not exists %s%d using %s tags(%d)", stableName, t, stableName, t);
|
||||
TAOS_RES *pSql = taos_query(con, qstr);
|
||||
code = taos_errno(pSql);
|
||||
if (code != 0) {
|
||||
pError("failed to create table %s%d, reason:%s", stableName, t, taos_errstr(con));
|
||||
}
|
||||
taos_free_result(pSql);
|
||||
}
|
||||
|
||||
float createTableSpeed = 0;
|
||||
for (int i = 0; i < numOfThreads; ++i) {
|
||||
createTableSpeed += pInfo[i].createTableSpeed;
|
||||
}
|
||||
|
||||
int64_t endMs = taosGetTimestampMs();
|
||||
int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex;
|
||||
float seconds = (endMs - startMs) / 1000.0;
|
||||
float speed = totalTables / seconds;
|
||||
pInfo->createTableSpeed = speed;
|
||||
|
||||
pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed);
|
||||
taos_close(con);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void printHelp() {
|
||||
char indent[10] = " ";
|
||||
printf("Used to test the performance while create table\n");
|
||||
|
||||
printf("%s%s\n", indent, "-c");
|
||||
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
|
||||
printf("%s%s\n", indent, "-d");
|
||||
printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", dbName);
|
||||
printf("%s%s\n", indent, "-s");
|
||||
printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", stableName);
|
||||
printf("%s%s\n", indent, "-t");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads);
|
||||
printf("%s%s\n", indent, "-n");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables);
|
||||
printf("%s%s\n", indent, "-columns");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfColumns, default is ", numOfColumns);
|
||||
printf("%s%s\n", indent, "-tables");
|
||||
printf("%s%s%s%d\n", indent, indent, "Database parameters tables, default is ", maxTables);
|
||||
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
void shellParseArgument(int argc, char *argv[]) {
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||
printHelp();
|
||||
exit(0);
|
||||
} else if (strcmp(argv[i], "-d") == 0) {
|
||||
strcpy(dbName, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-c") == 0) {
|
||||
strcpy(configDir, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s") == 0) {
|
||||
strcpy(stableName, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t") == 0) {
|
||||
numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0) {
|
||||
numOfTables = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-tables") == 0) {
|
||||
maxTables = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-columns") == 0) {
|
||||
numOfColumns = atoi(argv[++i]);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
pPrint("%s dbName:%s %s", GREEN, dbName, NC);
|
||||
pPrint("%s stableName:%s %s", GREEN, stableName, NC);
|
||||
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
|
||||
pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC);
|
||||
pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC);
|
||||
pPrint("%s numOfColumns:%d %s", GREEN, numOfColumns, NC);
|
||||
pPrint("%s dbPara maxTables:%d %s", GREEN, maxTables, NC);
|
||||
|
||||
pPrint("%s start create table performace test %s", GREEN, NC);
|
||||
}
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taos.h"
|
||||
#include "tulog.h"
|
||||
#include "ttime.h"
|
||||
#include "tutil.h"
|
||||
#include "hash.h"
|
||||
|
||||
#define MAX_RANDOM_POINTS 20000
|
||||
#define GREEN "\033[1;32m"
|
||||
#define NC "\033[0m"
|
||||
|
||||
int32_t capacity = 100000;
|
||||
int32_t q1Times = 1;
|
||||
int32_t q2Times = 1;
|
||||
int32_t keyNum = 100000;
|
||||
int32_t printInterval = 10000;
|
||||
|
||||
typedef struct HashTestRow {
|
||||
int32_t size;
|
||||
void * ptr;
|
||||
} HashTestRow;
|
||||
|
||||
void shellParseArgument(int argc, char *argv[]);
|
||||
|
||||
void testHashPerformance() {
|
||||
int64_t initialMs = taosGetTimestampMs();
|
||||
_hash_fn_t hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
void * hashHandle = taosHashInit(capacity, hashFp, true);
|
||||
|
||||
int64_t startMs = taosGetTimestampMs();
|
||||
float seconds = (startMs - initialMs) / 1000.0;
|
||||
pPrint("initial time %.2f sec", seconds);
|
||||
|
||||
for (int32_t t = 1; t <= keyNum; ++t) {
|
||||
HashTestRow row = {0};
|
||||
char key[100] = {0};
|
||||
int32_t keySize = sprintf(key, "0.db.st%d", t);
|
||||
|
||||
for (int32_t q = 0; q < q1Times; q++) {
|
||||
taosHashGet(hashHandle, &key, keySize);
|
||||
}
|
||||
|
||||
taosHashPut(hashHandle, key, keySize, &row, sizeof(HashTestRow));
|
||||
|
||||
for (int32_t q = 0; q < q2Times; q++) {
|
||||
taosHashGet(hashHandle, &key, keySize);
|
||||
}
|
||||
|
||||
if (t % printInterval == 0) {
|
||||
int64_t endMs = taosGetTimestampMs();
|
||||
int64_t hashSize = taosHashGetSize(hashHandle);
|
||||
float seconds = (endMs - startMs) / 1000.0;
|
||||
float speed = printInterval / seconds;
|
||||
pPrint("time:%.2f sec, speed:%.1f rows/second, hashSize:%ld", seconds, speed, hashSize);
|
||||
startMs = endMs;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t endMs = taosGetTimestampMs();
|
||||
int64_t hashSize = taosHashGetSize(hashHandle);
|
||||
seconds = (endMs - initialMs) / 1000.0;
|
||||
float speed = hashSize / seconds;
|
||||
|
||||
pPrint("total time:%.2f sec, avg speed:%.1f rows/second, hashSize:%ld", seconds, speed, hashSize);
|
||||
taosHashCleanup(hashHandle);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
shellParseArgument(argc, argv);
|
||||
testHashPerformance();
|
||||
}
|
||||
|
||||
void printHelp() {
|
||||
char indent[10] = " ";
|
||||
printf("Used to test the performance of cache\n");
|
||||
|
||||
printf("%s%s\n", indent, "-k");
|
||||
printf("%s%s%s%d\n", indent, indent, "key num, default is ", keyNum);
|
||||
printf("%s%s\n", indent, "-p");
|
||||
printf("%s%s%s%d\n", indent, indent, "print interval while put into hash, default is ", printInterval);
|
||||
printf("%s%s\n", indent, "-c");
|
||||
printf("%s%s%s%d\n", indent, indent, "the initial capacity of hash ", capacity);
|
||||
printf("%s%s\n", indent, "-q1");
|
||||
printf("%s%s%s%d\n", indent, indent, "query times before put into hash", q1Times);
|
||||
printf("%s%s\n", indent, "-q2");
|
||||
printf("%s%s%s%d\n", indent, indent, "query times after put into hash", q2Times);
|
||||
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
void shellParseArgument(int argc, char *argv[]) {
|
||||
for (int i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||
printHelp();
|
||||
exit(0);
|
||||
} else if (strcmp(argv[i], "-k") == 0) {
|
||||
keyNum = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-p") == 0) {
|
||||
printInterval = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-c") == 0) {
|
||||
capacity = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-q1") == 0) {
|
||||
q1Times = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-q2") == 0) {
|
||||
q2Times = atoi(argv[++i]);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
pPrint("%s capacity:%d %s", GREEN, capacity, NC);
|
||||
pPrint("%s printInterval:%d %s", GREEN, printInterval, NC);
|
||||
pPrint("%s q1Times:%d %s", GREEN, q1Times, NC);
|
||||
pPrint("%s q2Times:%d %s", GREEN, q2Times, NC);
|
||||
pPrint("%s keyNum:%d %s", GREEN, keyNum, NC);
|
||||
}
|
Loading…
Reference in New Issue