Merge branch 'feature/query' of https://github.com/taosdata/TDengine into feature/query

This commit is contained in:
Tao Liu 2020-06-03 07:32:17 +00:00
commit 6943250841
42 changed files with 724 additions and 1209 deletions

View File

@ -132,10 +132,7 @@ bool tscIsSelectivityWithTagQuery(SSqlCmd* pCmd);
void tscAddSpecialColumnForSelect(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId, SColumnIndex* pIndex,
SSchema* pColSchema, int16_t isTag);
//void addRequiredTagColumn(SQueryInfo* pQueryInfo, int32_t tagColIndex, int32_t tableIndex);
void addRequiredTagColumn(STableMetaInfo* pTableMetaInfo, SColumnIndex* index);
int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertData(char* sqlstr);

View File

@ -793,7 +793,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
STableMetaInfo *pSTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
tscSetTableId(pSTableMeterMetaInfo, &sToken, pSql);
tscSetTableFullName(pSTableMeterMetaInfo, &sToken, pSql);
strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_TABLE_ID_LEN);
code = tscGetTableMeta(pSql, pSTableMeterMetaInfo);
@ -834,9 +834,8 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
sql += index;
if (TK_STRING == sToken.type) {
sToken.n = strdequote(sToken.z);
strtrim(sToken.z);
sToken.n = (uint32_t)strlen(sToken.z);
strdequote(sToken.z);
sToken.n = strtrim(sToken.z);
}
if (sToken.type == TK_RP) {
@ -925,7 +924,11 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
for (int32_t i = 0; i < spd.numOfCols; ++i) {
if (!spd.hasVal[i]) { // current tag column do not have any value to insert, set it to null
setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
if (pTagSchema[i].type == TSDB_DATA_TYPE_BINARY || pTagSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(ptr, pTagSchema[i].type);
} else {
setNull(ptr, pTagSchema[i].type, pTagSchema[i].bytes);
}
}
ptr += pTagSchema[i].bytes;
@ -944,7 +947,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
}
int32_t ret = tscSetTableId(pTableMetaInfo, &tableToken, pSql);
int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
@ -1087,7 +1090,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
goto _error_clean;
}
if ((code = tscSetTableId(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _error_clean;
}
@ -1205,9 +1208,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
str += index;
if (TK_STRING == sToken.type) {
sToken.n = strdequote(sToken.z);
strtrim(sToken.z);
sToken.n = (uint32_t)strlen(sToken.z);
strdequote(sToken.z);
sToken.n = strtrim(sToken.z);
}
if (sToken.type == TK_RP) {

View File

@ -88,13 +88,23 @@ void tscSaveSlowQueryFp(void *handle, void *tmrId) {
}
void tscSaveSlowQuery(SSqlObj *pSql) {
const static int64_t SLOW_QUERY_INTERVAL = 3000000L;
if (pSql->res.useconds < SLOW_QUERY_INTERVAL) return;
const static int64_t SLOW_QUERY_INTERVAL = 3000000L; // todo configurable
size_t size = 200; // other part of sql string, expect the main sql str
if (pSql->res.useconds < SLOW_QUERY_INTERVAL) {
return;
}
tscTrace("%p query time:%" PRId64 " sql:%s", pSql, pSql->res.useconds, pSql->sqlstr);
char *sql = malloc(200);
int len = snprintf(sql, 200, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
int32_t sqlSize = TSDB_SHOW_SQL_LEN + size;
char *sql = malloc(sqlSize);
if (sql == NULL) {
tscError("%p failed to allocate memory to sent slow query to dnode", pSql);
return;
}
int len = snprintf(sql, size, "insert into %s.slowquery values(now, '%s', %" PRId64 ", %" PRId64 ", '", tsMonitorDbName,
pSql->pTscObj->user, pSql->stime, pSql->res.useconds);
int sqlLen = snprintf(sql + len, TSDB_SHOW_SQL_LEN, "%s", pSql->sqlstr);
if (sqlLen > TSDB_SHOW_SQL_LEN - 1) {
@ -102,8 +112,8 @@ void tscSaveSlowQuery(SSqlObj *pSql) {
} else {
sqlLen += len;
}
strcpy(sql + sqlLen, "')");
taosTmrStart(tscSaveSlowQueryFp, 200, sql, tscTmr);
}

View File

@ -163,8 +163,7 @@ static int32_t handlePassword(SSqlCmd* pCmd, SSQLToken* pPwd) {
}
strdequote(pPwd->z);
strtrim(pPwd->z); // trim space before and after passwords
pPwd->n = strlen(pPwd->z);
pPwd->n = strtrim(pPwd->z); // trim space before and after passwords
if (pPwd->n <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
@ -226,7 +225,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
} else if (pInfo->type == TSDB_SQL_DROP_TABLE) {
assert(pInfo->pDCLInfo->nTokens == 1);
if (tscSetTableId(pTableMetaInfo, pzName, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pTableMetaInfo, pzName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
@ -353,7 +352,7 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
if (tscSetTableId(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pTableMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -686,7 +685,7 @@ int32_t parseSlidingClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
return TSDB_CODE_SUCCESS;
}
int32_t tscSetTableId(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql) {
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SSQLToken* pzTableName, SSqlObj* pSql) {
const char* msg = "name too long";
SSqlCmd* pCmd = &pSql->cmd;
@ -1465,7 +1464,6 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExprItem* pItem, bool finalResult) {
STableMetaInfo* pTableMetaInfo = NULL;
int32_t optr = pItem->pNode->nSQLOptr;
const char* msg1 = "not support column types";
@ -4294,7 +4292,7 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
if (tscSetTableId(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pTableMetaInfo, &(pAlterSQL->name), pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
@ -4646,8 +4644,7 @@ int32_t validateColumnName(char* name) {
if (token.type == TK_STRING) {
strdequote(token.z);
strtrim(token.z);
token.n = (uint32_t)strlen(token.z);
token.n = strtrim(token.z);
int32_t k = tSQLGetToken(token.z, &token.type);
if (k != token.n) {
@ -5491,7 +5488,7 @@ int32_t doCheckForCreateTable(SSqlObj* pSql, int32_t subClauseIndex, SSqlInfo* p
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableId(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -5546,7 +5543,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (tscSetTableId(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pStableMeterMetaInfo, pToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
@ -5592,7 +5589,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
}
STableMetaInfo* pTableMeterMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
ret = tscSetTableId(pTableMeterMetaInfo, &pInfo->pCreateTableInfo->name, pSql);
ret = tscSetTableFullName(pTableMeterMetaInfo, &pInfo->pCreateTableInfo->name, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
@ -5635,7 +5632,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
if (tscSetTableId(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pTableMetaInfo, &srcToken, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg2);
}
@ -5666,7 +5663,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
}
// set the created table[stream] name
if (tscSetTableId(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pTableMetaInfo, pzTableName, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(pQueryInfo->msg, msg1);
}
@ -5775,7 +5772,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) {
STableMetaInfo* pMeterInfo1 = tscGetMetaInfo(pQueryInfo, i);
SSQLToken t = {.type = TSDB_DATA_TYPE_BINARY, .n = pTableItem->nLen, .z = pTableItem->pz};
if (tscSetTableId(pMeterInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
if (tscSetTableFullName(pMeterInfo1, &t, pSql) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}

View File

@ -817,9 +817,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
tblName[len] = '\0';
str = nextStr + 1;
strtrim(tblName);
len = (uint32_t)strlen(tblName);
len = strtrim(tblName);
SSQLToken sToken = {.n = len, .type = TK_ID, .z = tblName};
tSQLGetToken(tblName, &sToken.type);
@ -831,7 +829,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
return code;
}
if ((code = tscSetTableId(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -1715,6 +1715,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
pParentObj->res.numOfRows += numOfRows;
}
taos_free_result(tres);
int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (completed < total) {
return;
@ -1732,7 +1733,7 @@ static void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
pParentObj->fp = pParentObj->fetchFp;
// all data has been sent to vnode, call user function
(*pParentObj->fp)(pParentObj->param, tres, numOfRows);
(*pParentObj->fp)(pParentObj->param, pParentObj, numOfRows);
}
int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {

View File

@ -1231,9 +1231,8 @@ void tscColumnListDestroy(SArray* pColumnList) {
*
*/
static int32_t validateQuoteToken(SSQLToken* pToken) {
pToken->n = strdequote(pToken->z);
strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z);
strdequote(pToken->z);
pToken->n = strtrim(pToken->z);
int32_t k = tSQLGetToken(pToken->z, &pToken->type);
@ -1255,9 +1254,8 @@ int32_t tscValidateName(SSQLToken* pToken) {
char* sep = strnchr(pToken->z, TS_PATH_DELIMITER[0], pToken->n, true);
if (sep == NULL) { // single part
if (pToken->type == TK_STRING) {
pToken->n = strdequote(pToken->z);
strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z);
strdequote(pToken->z);
pToken->n = strtrim(pToken->z);
int len = tSQLGetToken(pToken->z, &pToken->type);
@ -1282,8 +1280,7 @@ int32_t tscValidateName(SSQLToken* pToken) {
char* pStr = pToken->z;
if (pToken->type == TK_SPACE) {
strtrim(pToken->z);
pToken->n = (uint32_t)strlen(pToken->z);
pToken->n = strtrim(pToken->z);
}
pToken->n = tSQLGetToken(pToken->z, &pToken->type);

View File

@ -322,7 +322,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_QUERY_TYPE_SUBQUERY 0x02u
#define TSDB_QUERY_TYPE_STABLE_SUBQUERY 0x04u // two-stage subquery for super table
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_TABLE_QUERY 0x08u // query ordinary table; below only apply to client side
#define TSDB_QUERY_TYPE_STABLE_QUERY 0x10u // query on super table
#define TSDB_QUERY_TYPE_JOIN_QUERY 0x20u // join query
#define TSDB_QUERY_TYPE_PROJECTION_QUERY 0x40u // select *,columns... query

View File

@ -93,15 +93,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NO_USER_FROM_CONN, 0, 0x0185, "can not get
TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 0x0200, "table already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_ID, 0, 0x0201, "invalid table id")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 0x0202, "invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_NAME, 0, 0x0203, "invalid table name")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 0x0204, "no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_ALREAY_EXIST, 0, 0x0205, "tag already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_NOT_EXIST, 0, 0x0206, "tag not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_ALREAY_EXIST, 0, 0x0207, "field already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_NOT_EXIST, 0, 0x0208, "field not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_COL_NAME_TOO_LONG, 0, 0x0209, "column name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TAGS, 0, 0x020A, "too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 0x0203, "no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_ALREAY_EXIST, 0, 0x0204, "tag already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TAG_NOT_EXIST, 0, 0x0205, "tag not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_ALREAY_EXIST, 0, 0x0206, "field already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_FIELD_NOT_EXIST, 0, 0x0207, "field not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_COL_NAME_TOO_LONG, 0, 0x0208, "column name too long")
TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TAGS, 0, 0x0209, "too many tags")
// dnode & mnode
TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 0x0280, "no enough dnodes")

View File

@ -708,6 +708,7 @@ void *readTable(void *sarg) {
sprintf(command, "select %s from %s%d where ts>= %" PRId64, aggreFunc[j], tb_prefix, i, sTime);
double t = getCurrentTime();
<<<<<<< HEAD
/*
if (taos_query(taos, command) != 0) {
fprintf(stderr, "Failed to query\n");
@ -718,18 +719,26 @@ void *readTable(void *sarg) {
TAOS_RES *result = taos_query(taos, command) ;
if (result == NULL) {
fprintf(stderr, "Failed to retreive results:%s\n", taos_errstr(taos));
=======
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos));
taos_free_result(pSql);
>>>>>>> 2f976c4f62b7a68626350e3ac15eddff20035b59
taos_close(taos);
exit(1);
exit(EXIT_FAILURE);
}
while (taos_fetch_row(result) != NULL) {
while (taos_fetch_row(pSql) != NULL) {
count++;
}
t = getCurrentTime() - t;
totalT += t;
taos_free_result(result);
taos_free_result(pSql);
}
fprintf(fp, "|%10s | %10d | %12.2f | %10.2f |\n",
@ -780,20 +789,18 @@ void *readMetric(void *sarg) {
fprintf(fp, "%s\n", command);
double t = getCurrentTime();
// if (taos_query(taos, command) != 0) {
// fprintf(stderr, "Failed to query\n");
// taos_close(taos);
// exit(EXIT_FAILURE);
// }
TAOS_RES *result = taos_query(taos,command);
if (result == NULL) {
fprintf(stderr, "Failed to retreive results:%s\n", taos_errstr(taos));
TAOS_RES *pSql = taos_query(taos, command);
int32_t code = taos_errno(pSql);
if (code != 0) {
fprintf(stderr, "Failed to query:%s\n", taos_errstr(taos));
taos_free_result(pSql);
taos_close(taos);
exit(1);
}
int count = 0;
while (taos_fetch_row(result) != NULL) {
while (taos_fetch_row(pSql) != NULL) {
count++;
}
t = getCurrentTime() - t;
@ -801,7 +808,7 @@ void *readMetric(void *sarg) {
fprintf(fp, "| Speed: %12.2f(per s) | Latency: %.4f(ms) |\n", num_of_tables * num_of_DPT / t, t * 1000);
printf("select %10s took %.6f second(s)\n\n", aggreFunc[j], t);
taos_free_result(result);
taos_free_result(pSql);
}
fprintf(fp, "\n");
}
@ -812,10 +819,19 @@ void *readMetric(void *sarg) {
void queryDB(TAOS *taos, char *command) {
int i = 5;
while (i > 0) {
if (taos_query(taos, command) == 0) break;
TAOS_RES *pSql = NULL;
int32_t code = -1;
while (i > 0 && code != 0) {
pSql = taos_query(taos, command);
code = taos_errno(pSql);
taos_free_result(pSql);
pSql = NULL;
if (code == 0) {
break;
}
i--;
}
if (i == 0) {
fprintf(stderr, "Failed to run %s, reason: %s\n", command, taos_errstr(taos));
taos_close(taos);
@ -948,6 +964,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
break;
}
}
tb_info->timestamp = tmp_time;
taos_query_a(tb_info->taos, buffer, callBack, tb_info);

View File

@ -372,15 +372,12 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
memset(pTableRecordInfo, 0, sizeof(STableRecordInfo));
sprintf(command, "show tables like %s", table);
/*
if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s\n", command);
return -1;
}
*/
result = taos_query(taos, command) ;
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
TAOS_RES *result = taos_query(taos, command);\
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
taos_free_result(result);
return -1;
}
@ -401,14 +398,12 @@ int taosGetTableRecordInfo(char *table, STableRecordInfo *pTableRecordInfo) {
if (isSet) return 0;
sprintf(command, "show stables like %s", table);
/* if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s\n", command);
return -1;
}
*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
taos_free_result(result);
return -1;
}
@ -468,14 +463,11 @@ int taosDumpOut(SDumpArguments *arguments) {
taosDumpCharset(fp);
sprintf(command, "show databases");
/*if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(taos));
goto _exit_failure;
}*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command: %s, reason: %s\n", command, taos_errstr(taos));
taos_free_result(result);
goto _exit_failure;
}
@ -613,7 +605,7 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
taosDumpCreateDbClause(dbInfo, arguments->with_property, fp);
sprintf(command, "use %s", dbInfo->name);
if (taos_query(taos, command) == NULL) {
if (taos_errno(taos_query(taos, command)) != 0) {
fprintf(stderr, "invalid database %s\n", dbInfo->name);
return -1;
}
@ -621,14 +613,11 @@ int taosDumpDb(SDbInfo *dbInfo, SDumpArguments *arguments, FILE *fp) {
fprintf(fp, "USE %s\n\n", dbInfo->name);
sprintf(command, "show tables");
/* if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s\n", command);
return -1;
}*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
result = taos_query(taos,command);
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
taos_free_result(result);
return -1;
}
@ -726,14 +715,11 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
TAOS_ROW row = NULL;
sprintf(command, "select %s from %s limit 1", tableDes->cols[counter].field, tableDes->name);
/*if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s\n", command);
return;
}*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
return;
}
@ -807,14 +793,12 @@ int taosGetTableDes(char *table, STableDef *tableDes) {
int count = 0;
sprintf(command, "describe %s", table);
/*if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s\n", command);
return -1;
}*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
taos_free_result(result);
return -1;
}
@ -890,14 +874,11 @@ int32_t taosDumpMetric(char *metric, SDumpArguments *arguments, FILE *fp) {
strcpy(tableRecord.metric, metric);
sprintf(command, "select tbname from %s", metric);
/*if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s\n", command);
return -1;
}*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, error: %s\n", command, taos_errstr(result));
taos_free_result(result);
return -1;
}
@ -943,18 +924,16 @@ int taosDumpTableData(FILE *fp, char *tbname, SDumpArguments *arguments) {
sprintf(command, "select * from %s where _c0 >= %" PRId64 " and _c0 <= %" PRId64 " order by _c0 asc", tbname, arguments->start_time,
arguments->end_time);
/*if (taos_query(taos, command) != 0) {
fprintf(stderr, "failed to run command %s, reason: %s\n", command, taos_errstr(taos));
return -1;
}*/
result = taos_query(taos, command);
if (result == NULL) {
fprintf(stderr, "failed to use result\n");
int32_t code = taos_errno(result);
if (code != 0) {
fprintf(stderr, "failed to run command %s, reason: %s\n", command, taos_errstr(result));
taos_free_result(result);
return -1;
}
numFields = taos_field_count(taos);
numFields = taos_field_count(result);
assert(numFields > 0);
TAOS_FIELD *fields = taos_fetch_fields(result);
tbuf = (char *)malloc(COMMAND_SIZE);
@ -1243,9 +1222,14 @@ int taosDumpIn(SDumpArguments *arguments) {
tcommand = command;
}
taosReplaceCtrlChar(tcommand);
if (taos_query(taos, tcommand) == NULL)
result = taos_query(taos, tcommand);
int32_t code = taos_errno(result);
if (code != 0)
{
fprintf(stderr, "linenu:%" PRId64 " failed to run command %s reason: %s \ncontinue...\n", linenu, command,
taos_errstr(taos));
}
taos_free_result(result);
}
pstr = command;

View File

@ -211,7 +211,7 @@ static int32_t mnodeProcessRetrieveMsg(SMnodeMsg *pMsg) {
pMsg->rpcRsp.rsp = pRsp;
pMsg->rpcRsp.len = size;
if (rowsToRead == 0 || rowsRead == rowsToRead) {
if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows - pShow->numOfReads == rowsToRead)) {
pRsp->completed = 1;
mnodeReleaseShowObj(pShow, true);
} else {

View File

@ -742,7 +742,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) {
if (!pInfo->createFlag) {
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
return TSDB_CODE_INVALID_TABLE_NAME;
return TSDB_CODE_INVALID_TABLE_ID;
} else {
mTrace("table:%s, failed to get table meta, start auto create table ", pInfo->tableId);
return mnodeAutoCreateChildTable(pMsg);

View File

@ -58,10 +58,10 @@ void httpProcessMultiSqlRetrieveCallBack(void *param, TAOS_RES *result, int numO
if (numOfRows < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, retrieve failed code:%s, sql:%s",
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, tstrerror(numOfRows), sql);
} else {
taos_free_result(result);
}
}
taos_free_result(result);
if (singleCmd->cmdReturnType == HTTP_CMD_RETURN_TYPE_WITH_RETURN && encode->stopJsonFp) {
(encode->stopJsonFp)(pContext, singleCmd);
}
@ -103,12 +103,15 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
}
multiCmds->pos++;
httpProcessMultiSql(pContext);
taos_free_result(result);
return;
}
if (result == NULL) {
int num_fields = taos_field_count(result);
if (num_fields == 0) {
// not select or show commands
int affectRows = code;
int affectRows = taos_affected_rows(result);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, affect rows:%d, sql:%s",
pContext, pContext->fd, pContext->ipstr, pContext->user, multiCmds->pos, affectRows, sql);
@ -132,6 +135,7 @@ void httpProcessMultiSqlCallBack(void *param, TAOS_RES *result, int code) {
multiCmds->pos++;
}
taos_free_result(result);
httpProcessMultiSql(pContext);
} else {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, start retrieve, sql:%s",
@ -212,9 +216,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
if (numOfRows < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr,
pContext->user, tstrerror(numOfRows));
} else {
taos_free_result(result);
}
}
taos_free_result(result);
if (encode->stopJsonFp) {
(encode->stopJsonFp)(pContext, &pContext->singleCmd);
@ -247,12 +251,14 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) {
pContext, pContext->fd, pContext->ipstr, pContext->user, pContext->session->taos, tstrerror(code), pObj);
httpSendTaosdErrorResp(pContext, code);
}
taos_free_result(result);
return;
}
if (result == NULL) {
int num_fields = taos_field_count(result);
if (num_fields == 0) {
// not select or show commands
int affectRows = code;
int affectRows = taos_affected_rows(result);
httpTrace("context:%p, fd:%d, ip:%s, user:%s, affect rows:%d, stop query, sqlObj:%p",
pContext, pContext->fd, pContext->ipstr, pContext->user, affectRows, result);
@ -269,6 +275,7 @@ void httpProcessSingleSqlCallBack(void *param, TAOS_RES *result, int code) {
(encode->stopJsonFp)(pContext, &pContext->singleCmd);
}
taos_free_result(result);
httpCloseContextByApp(pContext);
} else {
httpTrace("context:%p, fd:%d, ip:%s, user:%s, start retrieve", pContext, pContext->fd, pContext->ipstr,

View File

@ -116,11 +116,14 @@ typedef struct SQueryCostInfo {
uint64_t loadDataInCacheSize;
uint64_t loadDataTime;
uint64_t dataInRows;
uint64_t checkRows;
uint32_t dataBlocks;
uint64_t totalRows;
uint64_t totalCheckedRows;
uint32_t totalBlocks;
uint32_t loadBlocks;
uint32_t loadBlockStatis;
uint32_t discardBlocks;
uint64_t elapsedTime;
uint64_t computTime;
} SQueryCostInfo;
typedef struct SGroupItem {
@ -168,7 +171,7 @@ typedef struct SQueryRuntimeEnv {
SWindowResInfo windowResInfo;
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostInfo summary;
SQueryCostInfo summary;
bool stableQuery; // super table query or not
void* pQueryHandle;
void* pSecQueryHandle; // another thread for
@ -177,8 +180,6 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo {
void* signature;
TSKEY startTime;
TSKEY elapsedTime;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
sem_t dataReady;

View File

@ -1183,6 +1183,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
STableQueryInfo* pTableQInfo = pQuery->current;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
@ -1190,10 +1191,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock);
}
// update the lastkey of current table
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
// interval query with limit applied
int32_t numOfRes = 0;
@ -2013,7 +2014,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
if (*pStatis == NULL) { // data block statistics does not exist, load data block
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
pRuntimeEnv->summary.checkRows += pBlockInfo->rows;
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
}
} else {
assert(r == BLK_DATA_ALL_NEEDED);
@ -2032,7 +2033,7 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
// return DISK_DATA_DISCARDED;
}
pRuntimeEnv->summary.checkRows += pBlockInfo->rows;
pRuntimeEnv->summary.totalCheckedRows += pBlockInfo->rows;
pDataBlock = tsdbRetrieveDataBlock(pQueryHandle, NULL);
}
@ -2149,7 +2150,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
pRuntimeEnv->summary.dataBlocks += 1;
pRuntimeEnv->summary.totalBlocks += 1;
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
return 0;
}
@ -2185,12 +2186,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
ensureOutputBuffer(pRuntimeEnv, &blockInfo);
SDataStatis *pStatis = NULL;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
pRuntimeEnv->summary.dataInRows += blockInfo.rows;
pRuntimeEnv->summary.totalRows += blockInfo.rows;
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, pQuery->current->lastKey);
@ -3247,7 +3246,7 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
free(pTableQueryInfo);
}
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
void setCurrentQueryTable(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableQueryInfo;
@ -3316,7 +3315,7 @@ static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *pTableQueryInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
assert(pTableQueryInfo->lastKey >= 0);
assert(pTableQueryInfo->lastKey >= TSKEY_INITIAL_VAL);
setTagVal(pRuntimeEnv, pTableId, pQInfo->tsdb);
@ -3528,10 +3527,11 @@ static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryIn
}
}
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo,
SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis, SArray *pDataBlock,
__block_search_fn_t searchFn) {
void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, SDataStatis *pStatis,
SArray *pDataBlock, __block_search_fn_t searchFn) {
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
@ -3664,10 +3664,8 @@ int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int
}
}
void queryCostStatis(SQInfo *pQInfo) {
static void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
// SQuery *pQuery = pRuntimeEnv->pQuery;
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
// if (pRuntimeEnv->pResultBuf == NULL) {
//// pSummary->tmpBufferInDisk = 0;
@ -3687,8 +3685,9 @@ void queryCostStatis(SQInfo *pQInfo) {
// pQInfo, pSummary->readDiskBlocks, pSummary->totalBlockSize, pSummary->loadBlocksUs / 1000.0,
// pSummary->skippedFileBlocks, pSummary->totalGenData);
qTrace("QInfo:%p cost: check blocks:%d, statis:%d, rows:%"PRId64", check rows:%"PRId64, pQInfo, pSummary->dataBlocks,
pSummary->loadBlockStatis, pSummary->dataInRows, pSummary->checkRows);
qTrace("QInfo:%p :cost summary: elpased time:%"PRId64" us, total blocks:%d, use block statis:%d, use block data:%d, "
"total rows:%"PRId64 ", check rows:%"PRId64, pQInfo, pSummary->elapsedTime, pSummary->totalBlocks,
pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
// qTrace("QInfo:%p cost: temp file:%d Bytes", pQInfo, pSummary->tmpBufferInDisk);
//
@ -4084,12 +4083,13 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQueryCostInfo* summary = &pRuntimeEnv->summary;
int64_t st = taosGetTimestampMs();
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (isQueryKilled(pQInfo)) {
break;
}
@ -4121,11 +4121,9 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
}
assert(pTableQueryInfo != NULL);
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
printf("table:%d, groupIndex:%d, rows:%d\n", pTableQueryInfo->id.tid, pTableQueryInfo->groupIndex, blockInfo.tid);
setCurrentQueryTable(pRuntimeEnv, pTableQueryInfo);
SDataStatis *pStatis = NULL;
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
if (!isIntervalQuery(pQuery)) {
@ -4134,15 +4132,14 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
} else { // interval query
TSKEY nextKey = blockInfo.window.skey;
setIntervalQueryRange(pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
pQInfo->code = ret;
return taosGetTimestampMs() - st;
}
/*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
}
stableApplyFunctionsOnBlock(pRuntimeEnv, pTableQueryInfo, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
summary->totalRows += blockInfo.rows;
stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey);
}
int64_t et = taosGetTimestampMs();
@ -4504,10 +4501,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
}
if (pQuery->rec.rows == 0) {
// queryCostStatis(pSupporter);
}
qTrace("QInfo:%p current:%lld, total:%lld", pQInfo, pQuery->rec.rows, pQuery->rec.total);
return;
}
@ -4790,7 +4783,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
}
qTrace("QInfo:%p query over, %d rows are returned", pQInfo, pQuery->rec.total);
queryCostStatis(pQInfo);
return;
}
@ -4813,7 +4805,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
}
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->groupInfo.numOfTables == 1);
/* check if query is killed or not */
@ -4822,10 +4814,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
} else {// todo set the table uid and tid in log
qTrace("QInfo:%p query paused, %" PRId64 " rows returned, numOfTotal:%" PRId64 " rows",
pQInfo, pQuery->rec.rows, pQuery->rec.total + pQuery->rec.rows);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
queryCostStatis(pQInfo);
}
}
}
@ -4847,13 +4835,10 @@ static void stableQueryImpl(SQInfo *pQInfo) {
}
// record the total elapsed time
pQInfo->elapsedTime += (taosGetTimestampUs() - st);
// taosFillSetStartInfo(&pQInfo->runtimeEnv.pFillInfo, pQuery->size, pQInfo->query.fillType);
pQInfo->runtimeEnv.summary.elapsedTime += (taosGetTimestampUs() - st);
if (pQuery->rec.rows == 0) {
qTrace("QInfo:%p over, %d tables queried, %d points are returned", pQInfo, pQInfo->groupInfo.numOfTables,
pQuery->rec.total);
// queryCostStatis(pSupporter);
qTrace("QInfo:%p over, %d tables queried, %d rows are returned", pQInfo, pQInfo->groupInfo.numOfTables, pQuery->rec.total);
}
}
@ -5833,9 +5818,11 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
numOfGroupByCols = 0;
}
// todo handle the error
/*int32_t ret =*/tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &groupInfo, pGroupColIndex,
numOfGroupByCols);
if (code != TSDB_CODE_SUCCESS) {
goto _over;
}
} else {
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
@ -5875,6 +5862,9 @@ _over:
void qDestroyQueryInfo(qinfo_t pQInfo) {
qTrace("QInfo:%p query completed", pQInfo);
// print the query cost summary
queryCostStatis(pQInfo);
freeQInfo(pQInfo);
}
@ -5951,6 +5941,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
return TSDB_CODE_INVALID_QHANDLE;
}
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
size_t size = getResultSize(pQInfo, &pQuery->rec.rows);
size += sizeof(int32_t);
@ -5964,7 +5955,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
int32_t code = pQInfo->code;
if (code == TSDB_CODE_SUCCESS) {
(*pRsp)->offset = htobe64(pQuery->limit.offset);
(*pRsp)->useconds = htobe64(pQInfo->elapsedTime);
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
} else {
(*pRsp)->offset = 0;
(*pRsp)->useconds = 0;

View File

@ -110,6 +110,14 @@ void taosDestoryFillInfo(SFillInfo* pFillInfo) {
tfree(pFillInfo->prevValues);
tfree(pFillInfo->nextValues);
tfree(pFillInfo->pTags);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
tfree(pFillInfo->pData[i]);
}
tfree(pFillInfo->pData);
tfree(pFillInfo->pFillCol);
tfree(pFillInfo);
}
@ -247,7 +255,7 @@ int taosDoLinearInterpolation(int32_t type, SPoint* point1, SPoint* point2, SPoi
}
static void setTagsValue(SFillInfo* pColInfo, tFilePage** data, char** pTags, int32_t start, int32_t num) {
for (int32_t j = 0, i = start; i < pColInfo->numOfCols + pColInfo->numOfTags; ++i, ++j) {
for (int32_t j = 0, i = start; i < pColInfo->numOfCols; ++i, ++j) {
SFillColInfo* pCol = &pColInfo->pFillCol[i];
char* val1 = elePtrAt(data[i]->data, pCol->col.bytes, num);
@ -344,7 +352,7 @@ static void doInterpoResultImpl(SFillInfo* pFillInfo, tFilePage** data, int32_t*
setTagsValue(pFillInfo, data, pTags, numOfValCols, *num);
}
} else { /* default value interpolation */
} else { /* fill the default value */
for (int32_t i = 1; i < numOfValCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];

View File

@ -211,7 +211,7 @@ static void* taosAcceptTcpConnection(void *arg) {
tTrace("%s TCP server socket was shutdown, exiting...", pServerObj->label);
break;
}
tError("%s TCP accept failure(%s)", pServerObj->label, errno, strerror(errno));
tError("%s TCP accept failure(%s)", pServerObj->label, strerror(errno));
continue;
}

View File

@ -193,7 +193,7 @@ TsdbQueryHandleT* tsdbQueryTables(TsdbRepoT* tsdb, STsdbQueryCond* pCond, STable
}
}
uTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
tsdbTrace("%p total numOfTable:%d in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
@ -282,10 +282,10 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
uTrace("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
tsdbTrace("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else {
uTrace("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
tsdbTrace("%p uid:%" PRId64 ", tid:%d no data in mem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
}
if (!imemEmpty) {
@ -294,10 +294,10 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SDataRow row = SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
uTrace("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
tsdbTrace("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
} else {
uTrace("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
tsdbTrace("%p uid:%"PRId64", tid:%d no data in imem", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid);
}
return true;
@ -338,7 +338,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
SDataRow row = SL_GET_NODE_DATA(node);
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
uTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
tsdbTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order, pHandle->qinfo);
// all data in mem are checked already.
@ -1038,7 +1038,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
cur->rows = numOfRows;
cur->pos = pos;
uTrace("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" %p", pQueryHandle, cur->win.skey,
tsdbTrace("%p uid:%" PRIu64",tid:%d data block created, brange:%"PRIu64"-%"PRIu64" %p", pQueryHandle, cur->win.skey,
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
}
@ -1138,7 +1138,7 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
if (pLeftBlockInfoEx->compBlock->offset == pRightBlockInfoEx->compBlock->offset &&
pLeftBlockInfoEx->compBlock->last == pRightBlockInfoEx->compBlock->last) {
// todo add more information
uError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->compBlock->offset);
tsdbError("error in header file, two block with same offset:%p", pLeftBlockInfoEx->compBlock->offset);
}
return pLeftBlockInfoEx->compBlock->offset > pRightBlockInfoEx->compBlock->offset ? 1 : -1;
@ -1200,7 +1200,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
numOfQualTables++;
}
uTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
tsdbTrace("%p create data blocks info struct completed, %d blocks in %d tables", pQueryHandle, cnt, numOfQualTables);
assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0
sup.numOfTables = numOfQualTables;
@ -1236,7 +1236,7 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
* }
*/
uTrace("%p %d data blocks sort completed", pQueryHandle, cnt);
tsdbTrace("%p %d data blocks sort completed", pQueryHandle, cnt);
cleanBlockOrderSupporter(&sup, numOfTables);
free(pTree);
@ -1257,7 +1257,7 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
break;
}
uTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
tsdbTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
numOfTables, pQueryHandle->pFileGroup->fileId);
assert(numOfBlocks >= 0);
@ -1583,7 +1583,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, STable* pTable, TSKEY
if ((key > maxKey && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < maxKey && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
uTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
tsdbTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
pQueryHandle->window.ekey);
break;
@ -1958,7 +1958,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
size_t size = taosArrayGetSize(pTableList);
if (size == 0) {
uTrace("no qualified tables");
tsdbTrace("no qualified tables");
return pTableGroup;
}
@ -1970,7 +1970,7 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
taosArrayPush(pTableGroup, &sa);
uTrace("all %d tables belong to one group", size);
tsdbTrace("all %d tables belong to one group", size);
} else {
STableGroupSupporter *pSupp = (STableGroupSupporter *) calloc(1, sizeof(STableGroupSupporter));
pSupp->tsdbMeta = tsdbGetMeta(tsdb);
@ -2069,12 +2069,12 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
SColIndex* pColIndex, int32_t numOfCols) {
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
if (pTable == NULL) {
uError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
tsdbError("%p failed to get stable, uid:%" PRIu64, tsdb, uid);
return TSDB_CODE_INVALID_TABLE_ID;
}
if (pTable->type != TSDB_SUPER_TABLE) {
uError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s",
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", tid:%d, name:%s",
tsdb, uid, pTable->tableId.tid, pTable->name);
return TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client
@ -2090,7 +2090,7 @@ int32_t tsdbQuerySTableByTagCond(TsdbRepoT* tsdb, uint64_t uid, const char* pTag
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, tsdb);
uTrace("no tbname condition or tagcond, all tables belongs to one group, numOfTables:%d", pGroupInfo->numOfTables);
tsdbTrace("no tbname condition or tagcond, all tables belongs to one group, numOfTables:%d", pGroupInfo->numOfTables);
} else {
// todo add error
}

View File

@ -113,7 +113,7 @@ extern "C" {
int32_t strdequote(char *src);
void strtrim(char *src);
size_t strtrim(char *src);
char *strnchr(char *haystack, char needle, int32_t len, bool skipquote);

View File

@ -60,7 +60,7 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing
}
void strtrim(char *z) {
size_t strtrim(char *z) {
int32_t i = 0;
int32_t j = 0;
@ -71,7 +71,7 @@ void strtrim(char *z) {
if (z[j] == 0) {
z[0] = 0;
return;
return 0;
}
delta = j;
@ -89,9 +89,12 @@ void strtrim(char *z) {
if (stop > 0) {
z[stop - delta] = 0;
return (stop - delta);
} else if (j != i) {
z[i] = 0;
}
return i;
}
char **strsplit(char *z, const char *delim, int32_t *num) {

View File

@ -20,10 +20,10 @@ TEST(testCase, string_dequote_test) {
EXPECT_STRCASEEQ(t1, "abc");
char t21[] = " abc ";
strtrim(t21);
int32_t lx = strtrim(t21);
EXPECT_STREQ("abc", t21);
EXPECT_EQ(3, strlen(t21));
EXPECT_EQ(3, lx);
}
TEST(testCase, string_replace_test) {

View File

@ -26,13 +26,12 @@
void taosMsleep(int mseconds);
static int32_t doQuery(TAOS* taos, const char* sql) {
TAOS_RES* res = taos_query(taos, sql);
if (res == NULL) {
TAOS_RES* res = taos_query(taos, sql);
if (taos_errno(res) != 0) {
printf("failed to execute query, reason:%s\n", taos_errstr(taos));
return -1;
}
//TAOS_RES* res = taos_use_result(taos);
TAOS_ROW row = NULL;
char buf[512] = {0};
@ -46,7 +45,6 @@ static int32_t doQuery(TAOS* taos, const char* sql) {
}
taos_free_result(res);
return 0;
}
@ -81,6 +79,7 @@ static __attribute__((unused)) void multiThreadTest(int32_t numOfThreads, void*
pthread_join(threadId[i], NULL);
}
free(threadId);
pthread_attr_destroy(&thattr);
}
@ -108,24 +107,11 @@ int main(int argc, char *argv[]) {
printf("success to connect to server\n");
// multiThreadTest(1, taos);
doQuery(taos, "select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_db0.lm2_stb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1, -2) group by t1 limit 2 offset 10;");
doQuery(taos, "insert into tb9 (ts, c1, c2) using stb (t1, t2) tags ('tag4', 4) values ( now + 4s, 'binary4', 4);");
// for(int32_t i = 0; i < 100000; ++i) {
// doQuery(taos, "insert into t1 values(now, 2)");
// }
// doQuery(taos, "create table t1(ts timestamp, k binary(12), f nchar(2))");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 'abc')");
// doQuery(taos, "create table if not exists tm0 (ts timestamp, k int);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:1', 1);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:2', 2);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:3', 3);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:4', 4);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:5', 5);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:6', 6);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:7', 7);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:8', 8);");
// doQuery(taos, "insert into tm0 values('2020-1-1 1:1:9', 9);");
// doQuery(taos, "select sum(k),count(*) from m1 group by a");
taos_close(taos);
return 0;
@ -167,8 +153,11 @@ int main(int argc, char *argv[]) {
// query the records
sprintf(qstr, "SELECT * FROM m1");
if (taos_query(taos, qstr) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(taos));
exit(1);
}
result = taos_query(taos, qstr);
if (result == NULL) {
printf("failed to get result, reason:%s\n", taos_errstr(taos));
exit(1);

View File

@ -609,7 +609,8 @@ class StateDbOnly(AnyState):
]
def verifyTasksToState(self, tasks, newState):
self.assertAtMostOneSuccess(tasks, DropDbTask) # not true in massively parralel cases
if ( not self.hasTask(tasks, CreateDbTask) ):
self.assertAtMostOneSuccess(tasks, DropDbTask) # only if we don't create any more
self.assertIfExistThenSuccess(tasks, DropDbTask)
# self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
# Nothing to be said about adding data task
@ -619,7 +620,8 @@ class StateDbOnly(AnyState):
# self._state = self.STATE_EMPTY
elif ( self.hasSuccess(tasks, CreateFixedSuperTableTask) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful
if ( not self.hasTask(tasks, DropFixedSuperTableTask) ):
self.assertAtMostOneSuccess(tasks, CreateFixedSuperTableTask) # at most 1 attempt is successful, if we don't drop anything
self.assertNoTask(tasks, DropDbTask) # should have have tried
# if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# # can't say there's add-data attempts, since they may all fail
@ -674,7 +676,7 @@ class StateHasData(AnyState):
if ( not self.hasTask(tasks, CreateDbTask)): # without a create_db task
self.assertNoTask(tasks, DropDbTask) # we must have drop_db task
self.hasSuccess(tasks, DropFixedSuperTableTask)
self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
# self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
self.assertNoTask(tasks, DropDbTask)
self.assertNoTask(tasks, DropFixedSuperTableTask)
@ -689,9 +691,9 @@ class StateHasData(AnyState):
# State of the database as we believe it to be
class DbState():
def __init__(self):
def __init__(self, resetDb = True):
self.tableNumQueue = LinearQueue()
self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick
self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock()
@ -712,12 +714,32 @@ class DbState():
except:
print("[=] Unexpected exception")
raise
self._dbConn.resetDb() # drop and recreate DB
self._state = StateEmpty() # initial state, the result of above
if resetDb :
self._dbConn.resetDb() # drop and recreate DB
self._state = self._findCurrentState()
def getDbConn(self):
return self._dbConn
def getState(self):
return self._state
# We aim to create a starting time tick, such that, whenever we run our test here once
# We should be able to safely create 100,000 records, which will not have any repeated time stamp
# when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
# by a factor of 500.
# TODO: what if it goes beyond 10 years into the future
def setupLastTick(self):
t1 = datetime.datetime(2020, 5, 30)
t2 = datetime.datetime.now()
elSec = t2.timestamp() - t1.timestamp()
# print("elSec = {}".format(elSec))
t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec * 500) # see explanation above
logger.info("Setting up TICKS to start from: {}".format(t4))
return t4
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
@ -743,7 +765,7 @@ class DbState():
return self._lastInt
def getNextBinary(self):
return "Los_Angeles_{}".format(self.getNextInt())
return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(self.getNextInt())
def getNextFloat(self):
return 0.9 + self.getNextInt()
@ -1089,7 +1111,7 @@ class CreateFixedSuperTableTask(StateTransitionTask):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedSuperTableName()
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(20), f float) ".format(tblName))
wt.execSql("create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
# No need to create the regular tables, INSERT will do that automatically
@ -1148,12 +1170,13 @@ class AddFixedDataTask(StateTransitionTask):
ds = self._dbState
wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
for i in range(10): # 0 to 9
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
i,
ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
for j in range(10) :
sql = "insert into db.reg_table_{} using {} tags ('{}', {}) values ('{}', {});".format(
i,
ds.getFixedSuperTableName(),
ds.getNextBinary(), ds.getNextFloat(),
ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
#---------- Non State-Transition Related Tasks ----------#
@ -1301,7 +1324,9 @@ def main():
ch = logging.StreamHandler()
logger.addHandler(ch)
dbState = DbState()
# resetDb = False # DEBUG only
# dbState = DbState(resetDb) # DBEUG only!
dbState = DbState() # Regular function
Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator(
ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0),
@ -1309,6 +1334,43 @@ def main():
dbState
)
# # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
# dbc = dbState.getDbConn()
# sTbName = dbState.getFixedSuperTableName()
# dbc.execute("create database if not exists db")
# if not dbState.getState().equals(StateEmpty()):
# dbc.execute("use db")
# rTables = None
# try: # the super table may not exist
# sql = "select TBNAME from db.{}".format(sTbName)
# logger.info("Finding out tables in super table: {}".format(sql))
# dbc.query(sql) # TODO: analyze result set later
# logger.info("Fetching result")
# rTables = dbc.getQueryResult()
# logger.info("Result: {}".format(rTables))
# except taos.error.ProgrammingError as err:
# logger.info("Initial Super table OPS error: {}".format(err))
# # sys.exit()
# if ( not rTables == None):
# # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
# try:
# for rTbName in rTables : # regular tables
# ds = dbState
# logger.info("Inserting into table: {}".format(rTbName[0]))
# sql = "insert into db.{} values ('{}', {});".format(
# rTbName[0],
# ds.getNextTick(), ds.getNextInt())
# dbc.execute(sql)
# for rTbName in rTables : # regular tables
# dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
# logger.info("Initial READING operation is successful")
# except taos.error.ProgrammingError as err:
# logger.info("Initial WRITE/READ error: {}".format(err))
# Sandbox testing code
# dbc = dbState.getDbConn()
# while True:

View File

@ -1,831 +0,0 @@
#!/usr/bin/python3.7
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from __future__ import annotations # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
import sys
# Require Python 3
if sys.version_info[0] < 3:
raise Exception("Must be using Python 3")
import getopt
import argparse
import copy
import threading
import random
import logging
import datetime
import textwrap
from typing import List
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *
import crash_gen
import taos
# Global variables, tried to keep a small number.
gConfig = None # Command-line/Environment Configurations, will set a bit later
logger = None
def runThread(wt: WorkerThread):
wt.run()
class WorkerThread:
def __init__(self, pool: ThreadPool, tid,
tc: ThreadCoordinator,
# te: TaskExecutor,
): # note: main thread context!
# self._curStep = -1
self._pool = pool
self._tid = tid
self._tc = tc
# self.threadIdent = threading.get_ident()
self._thread = threading.Thread(target=runThread, args=(self,))
self._stepGate = threading.Event()
# Let us have a DB connection of our own
if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn = DbConn()
def logDebug(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg))
def logInfo(self, msg):
logger.info(" t[{}] {}".format(self._tid, msg))
def getTaskExecutor(self):
return self._tc.getTaskExecutor()
def start(self):
self._thread.start() # AFTER the thread is recorded
def run(self):
# initialization after thread starts, in the thread context
# self.isSleeping = False
logger.info("Starting to run thread: {}".format(self._tid))
if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn.open()
self._doTaskLoop()
# clean up
if ( gConfig.per_thread_db_connection ): # type: ignore
self._dbConn.close()
def _doTaskLoop(self) :
# while self._curStep < self._pool.maxSteps:
# tc = ThreadCoordinator(None)
while True:
tc = self._tc # Thread Coordinator, the overall master
tc.crossStepBarrier() # shared barrier first, INCLUDING the last one
logger.debug("Thread task loop exited barrier...")
self.crossStepGate() # then per-thread gate, after being tapped
logger.debug("Thread task loop exited step gate...")
if not self._tc.isRunning():
break
task = tc.fetchTask()
task.execute(self)
tc.saveExecutedTask(task)
def verifyThreadSelf(self): # ensure we are called by this own thread
if ( threading.get_ident() != self._thread.ident ):
raise RuntimeError("Unexpectly called from other threads")
def verifyThreadMain(self): # ensure we are called by the main thread
if ( threading.get_ident() != threading.main_thread().ident ):
raise RuntimeError("Unexpectly called from other threads")
def verifyThreadAlive(self):
if ( not self._thread.is_alive() ):
raise RuntimeError("Unexpected dead thread")
# A gate is different from a barrier in that a thread needs to be "tapped"
def crossStepGate(self):
self.verifyThreadAlive()
self.verifyThreadSelf() # only allowed by ourselves
# Wait again at the "gate", waiting to be "tapped"
# logger.debug("Worker thread {} about to cross the step gate".format(self._tid))
self._stepGate.wait()
self._stepGate.clear()
# self._curStep += 1 # off to a new step...
def tapStepGate(self): # give it a tap, release the thread waiting there
self.verifyThreadAlive()
self.verifyThreadMain() # only allowed for main thread
logger.debug("Tapping worker thread {}".format(self._tid))
self._stepGate.set() # wake up!
time.sleep(0) # let the released thread run a bit
def execSql(self, sql): # not "execute", since we are out side the DB context
if ( gConfig.per_thread_db_connection ):
return self._dbConn.execute(sql)
else:
return self._tc.getDbState().getDbConn().execute(sql)
def querySql(self, sql): # not "execute", since we are out side the DB context
if ( gConfig.per_thread_db_connection ):
return self._dbConn.query(sql)
else:
return self._tc.getDbState().getDbConn().query(sql)
class ThreadCoordinator:
def __init__(self, pool, wd: WorkDispatcher, dbState):
self._curStep = -1 # first step is 0
self._pool = pool
self._wd = wd
self._te = None # prepare for every new step
self._dbState = dbState
self._executedTasks: List[Task] = [] # in a given step
self._lock = threading.RLock() # sync access for a few things
self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
def getTaskExecutor(self):
return self._te
def getDbState(self) -> DbState :
return self._dbState
def crossStepBarrier(self):
self._stepBarrier.wait()
def run(self):
self._pool.createAndStartThreads(self)
# Coordinate all threads step by step
self._curStep = -1 # not started yet
maxSteps = gConfig.max_steps # type: ignore
while(self._curStep < maxSteps):
print(".", end="", flush=True)
logger.debug("Main thread going to sleep")
# Now ready to enter a step
self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
self._stepBarrier.reset() # Other worker threads should now be at the "gate"
# At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
self._dbState.transition(self._executedTasks) # at end of step, transiton the DB state
self.resetExecutedTasks() # clear the tasks after we are done
# Get ready for next step
logger.info("<-- Step {} finished".format(self._curStep))
self._curStep += 1 # we are about to get into next step. TODO: race condition here!
logger.debug("\r\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
# A new TE for the new step
self._te = TaskExecutor(self._curStep)
logger.debug("Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep
self.tapAllThreads()
logger.debug("Main thread ready to finish up...")
self.crossStepBarrier() # Cross it one last time, after all threads finish
self._stepBarrier.reset()
logger.debug("Main thread in exclusive zone...")
self._te = None # No more executor, time to end
logger.debug("Main thread tapping all threads one last time...")
self.tapAllThreads() # Let the threads run one last time
logger.debug("Main thread joining all threads")
self._pool.joinAll() # Get all threads to finish
logger.info("All threads finished")
print("\r\nFinished")
def tapAllThreads(self): # in a deterministic manner
wakeSeq = []
for i in range(self._pool.numThreads): # generate a random sequence
if Dice.throw(2) == 1 :
wakeSeq.append(i)
else:
wakeSeq.insert(0, i)
logger.info("Waking up threads: {}".format(str(wakeSeq)))
# TODO: set dice seed to a deterministic value
for i in wakeSeq:
self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
time.sleep(0) # yield
def isRunning(self):
return self._te != None
def fetchTask(self) -> Task :
if ( not self.isRunning() ): # no task
raise RuntimeError("Cannot fetch task when not running")
# return self._wd.pickTask()
# Alternatively, let's ask the DbState for the appropriate task
dbState = self.getDbState()
tasks = dbState.getTasksAtState()
i = Dice.throw(len(tasks))
# return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
return tasks[i].clone()
def resetExecutedTasks(self):
self._executedTasks = [] # should be under single thread
def saveExecutedTask(self, task):
with self._lock:
self._executedTasks.append(task)
# We define a class to run a number of threads in locking steps.
class ThreadPool:
def __init__(self, dbState, numThreads, maxSteps, funcSequencer):
self.numThreads = numThreads
self.maxSteps = maxSteps
self.funcSequencer = funcSequencer
# Internal class variables
self.dispatcher = WorkDispatcher(dbState)
self.curStep = 0
self.threadList = []
# self.stepGate = threading.Condition() # Gate to hold/sync all threads
# self.numWaitingThreads = 0
# starting to run all the threads, in locking steps
def createAndStartThreads(self, tc: ThreadCoordinator):
for tid in range(0, self.numThreads): # Create the threads
workerThread = WorkerThread(self, tid, tc)
self.threadList.append(workerThread)
workerThread.start() # start, but should block immediately before step 0
def joinAll(self):
for workerThread in self.threadList:
logger.debug("Joining thread...")
workerThread._thread.join()
# A queue of continguous POSITIVE integers
class LinearQueue():
def __init__(self):
self.firstIndex = 1 # 1st ever element
self.lastIndex = 0
self._lock = threading.RLock() # our functions may call each other
self.inUse = set() # the indexes that are in use right now
def toText(self):
return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)
# Push (add new element, largest) to the tail, and mark it in use
def push(self):
with self._lock:
# if ( self.isEmpty() ):
# self.lastIndex = self.firstIndex
# return self.firstIndex
# Otherwise we have something
self.lastIndex += 1
self.allocate(self.lastIndex)
# self.inUse.add(self.lastIndex) # mark it in use immediately
return self.lastIndex
def pop(self):
with self._lock:
if ( self.isEmpty() ):
# raise RuntimeError("Cannot pop an empty queue")
return False # TODO: None?
index = self.firstIndex
if ( index in self.inUse ):
return False
self.firstIndex += 1
return index
def isEmpty(self):
return self.firstIndex > self.lastIndex
def popIfNotEmpty(self):
with self._lock:
if (self.isEmpty()):
return 0
return self.pop()
def allocate(self, i):
with self._lock:
# logger.debug("LQ allocating item {}".format(i))
if ( i in self.inUse ):
raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
self.inUse.add(i)
def release(self, i):
with self._lock:
# logger.debug("LQ releasing item {}".format(i))
self.inUse.remove(i) # KeyError possible, TODO: why?
def size(self):
return self.lastIndex + 1 - self.firstIndex
def pickAndAllocate(self):
if ( self.isEmpty() ):
return None
with self._lock:
cnt = 0 # counting the interations
while True:
cnt += 1
if ( cnt > self.size()*10 ): # 10x iteration already
# raise RuntimeError("Failed to allocate LinearQueue element")
return None
ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
if ( not ret in self.inUse ):
self.allocate(ret)
return ret
class DbConn:
def __init__(self):
self._conn = None
self._cursor = None
self.isOpen = False
def open(self): # Open connection
if ( self.isOpen ):
raise RuntimeError("Cannot re-open an existing DB connection")
cfgPath = "../../build/test/cfg"
self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
self._cursor = self._conn.cursor()
# Get the connection/cursor ready
self._cursor.execute('reset query cache')
# self._cursor.execute('use db')
# Open connection
self._tdSql = TDSql()
self._tdSql.init(self._cursor)
self.isOpen = True
def resetDb(self): # reset the whole database, etc.
if ( not self.isOpen ):
raise RuntimeError("Cannot reset database until connection is open")
# self._tdSql.prepare() # Recreate database, etc.
self._cursor.execute('drop database if exists db')
logger.debug("Resetting DB, dropped database")
# self._cursor.execute('create database db')
# self._cursor.execute('use db')
# tdSql.execute('show databases')
def close(self):
if ( not self.isOpen ):
raise RuntimeError("Cannot clean up database until connection is open")
self._tdSql.close()
self.isOpen = False
def execute(self, sql):
if ( not self.isOpen ):
raise RuntimeError("Cannot execute database commands until connection is open")
return self._tdSql.execute(sql)
def query(self, sql) -> int : # return number of rows retrieved
if ( not self.isOpen ):
raise RuntimeError("Cannot query database until connection is open")
return self._tdSql.query(sql)
# State of the database as we believe it to be
class DbState():
STATE_INVALID = -1
STATE_EMPTY = 1 # nothing there, no even a DB
STATE_DB_ONLY = 2 # we have a DB, but nothing else
STATE_TABLE_ONLY = 3 # we have a table, but totally empty
STATE_HAS_DATA = 4 # we have some data in the table
def __init__(self):
self.tableNumQueue = LinearQueue()
self._lastTick = datetime.datetime(2019, 1, 1) # initial date time tick
self._lastInt = 0 # next one is initial integer
self._lock = threading.RLock()
self._state = self.STATE_INVALID
# self.openDbServerConnection()
self._dbConn = DbConn()
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if ( err.msg == 'disconnected' ): # cannot open DB connection
print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit()
else:
raise
except:
print("[=]Unexpected exception")
raise
self._dbConn.resetDb() # drop and recreate DB
self._state = self.STATE_EMPTY # initial state, the result of above
def getDbConn(self):
return self._dbConn
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
def getFixedTableName(self):
return "fixed_table"
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
def getNextTick(self):
with self._lock: # prevent duplicate tick
self._lastTick += datetime.timedelta(0, 1) # add one second to it
return self._lastTick
def getNextInt(self):
with self._lock:
self._lastInt += 1
return self._lastInt
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if ( not tblNum ): # maybe false
return False
return "table_{}".format(tblNum)
def execSql(self, sql): # using the main DB connection
return self._dbConn.execute(sql)
def cleanUp(self):
self._dbConn.close()
def getTasksAtState(self):
tasks = []
tasks.append(ReadFixedDataTask(self)) # always
if ( self._state == self.STATE_EMPTY ):
tasks.append(CreateDbTask(self))
tasks.append(CreateFixedTableTask(self))
elif ( self._state == self.STATE_DB_ONLY ):
tasks.append(DropDbTask(self))
tasks.append(CreateFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
elif ( self._state == self.STATE_TABLE_ONLY ):
tasks.append(DropFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
elif ( self._state == self.STATE_HAS_DATA ) : # same as above. TODO: adjust
tasks.append(DropFixedTableTask(self))
tasks.append(AddFixedDataTask(self))
else:
raise RuntimeError("Unexpected DbState state: {}".format(self._state))
return tasks
def transition(self, tasks):
if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
return # do nothing
if ( self._state == self.STATE_EMPTY ):
# self.assertNoSuccess(tasks, ReadFixedDataTask) # some read may be successful, since we might be creating a table
if ( self.hasSuccess(tasks, CreateDbTask) ):
self.assertAtMostOneSuccess(tasks, CreateDbTask) # param is class
self._state = self.STATE_DB_ONLY
if ( self.hasSuccess(tasks, CreateFixedTableTask )):
self._state = self.STATE_TABLE_ONLY
# else: # no successful table creation, not much we can say, as it is step 2
else: # did not create db
self.assertNoTask(tasks, CreateDbTask) # because we did not have such task
# self.assertNoSuccess(tasks, CreateDbTask) # not necessary, since we just verified no such task
self.assertNoSuccess(tasks, CreateFixedTableTask)
elif ( self._state == self.STATE_DB_ONLY ):
self.assertAtMostOneSuccess(tasks, DropDbTask)
self.assertIfExistThenSuccess(tasks, DropDbTask)
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask)
# Nothing to be said about adding data task
if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
# self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
self.assertAtMostOneSuccess(tasks, DropDbTask)
self._state = self.STATE_EMPTY
elif ( self.hasSuccess(tasks, CreateFixedTableTask) ): # did not drop db, create table success
# self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # at most 1 attempt is successful
self.assertNoTask(tasks, DropDbTask) # should have have tried
if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
# can't say there's add-data attempts, since they may all fail
self._state = self.STATE_TABLE_ONLY
else:
self._state = self.STATE_HAS_DATA
else: # no success in dropping db tasks, no success in create fixed table, not acceptable
raise RuntimeError("Unexpected no-success scenario")
elif ( self._state == self.STATE_TABLE_ONLY ):
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
self._state = self.STATE_DB_ONLY
elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
self.assertNoTask(tasks, DropFixedTableTask)
self._state = self.STATE_HAS_DATA
else: # did not drop table, did not insert data, that is impossible
raise RuntimeError("Unexpected no-success scenarios")
elif ( self._state == self.STATE_HAS_DATA ): # Same as above, TODO: adjust
if ( self.hasSuccess(tasks, DropFixedTableTask) ):
self.assertAtMostOneSuccess(tasks, DropFixedTableTask)
self._state = self.STATE_DB_ONLY
elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table
self.assertNoTask(tasks, DropFixedTableTask)
self._state = self.STATE_HAS_DATA
else: # did not drop table, did not insert data, that is impossible
raise RuntimeError("Unexpected no-success scenarios")
else:
raise RuntimeError("Unexpected DbState state: {}".format(self._state))
logger.debug("New DB state is: {}".format(self._state))
def assertAtMostOneSuccess(self, tasks, cls):
sCnt = 0
for task in tasks :
if not isinstance(task, cls):
continue
if task.isSuccess():
task.logDebug("Task success found")
sCnt += 1
if ( sCnt >= 2 ):
raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))
def assertIfExistThenSuccess(self, tasks, cls):
sCnt = 0
exists = False
for task in tasks :
if not isinstance(task, cls):
continue
exists = True # we have a valid instance
if task.isSuccess():
sCnt += 1
if ( exists and sCnt <= 0 ):
raise RuntimeError("Unexpected zero success for task: {}".format(cls))
def assertNoTask(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
raise RuntimeError("Unexpected task: {}".format(cls))
def assertNoSuccess(self, tasks, cls):
for task in tasks :
if isinstance(task, cls):
if task.isSuccess():
raise RuntimeError("Unexpected successful task: {}".format(cls))
def hasSuccess(self, tasks, cls):
for task in tasks :
if not isinstance(task, cls):
continue
if task.isSuccess():
return True
return False
class TaskExecutor():
def __init__(self, curStep):
self._curStep = curStep
def getCurStep(self):
return self._curStep
def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
task.execute(wt)
# def logInfo(self, msg):
# logger.info(" T[{}.x]: ".format(self._curStep) + msg)
# def logDebug(self, msg):
# logger.debug(" T[{}.x]: ".format(self._curStep) + msg)
class Task():
taskSn = 100
@classmethod
def allocTaskNum(cls):
cls.taskSn += 1
return cls.taskSn
def __init__(self, dbState: DbState):
self._dbState = dbState
self._workerThread = None
self._err = None
self._curStep = None
self._numRows = None # Number of rows affected
# Assign an incremental task serial number
self._taskNum = self.allocTaskNum()
def isSuccess(self):
return self._err == None
def clone(self):
newTask = self.__class__(self._dbState)
return newTask
def logDebug(self, msg):
self._workerThread.logDebug("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def logInfo(self, msg):
self._workerThread.logInfo("s[{}.{}] {}".format(self._curStep, self._taskNum, msg))
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
def execute(self, wt: WorkerThread):
wt.verifyThreadSelf()
self._workerThread = wt # type: ignore
te = wt.getTaskExecutor()
self._curStep = te.getCurStep()
self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
self._err = None
try:
self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err:
self.logDebug("[=]Taos Execution exception: {0}".format(err))
self._err = err
except:
self.logDebug("[=]Unexpected exception")
raise
self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
def execSql(self, sql):
return self._dbState.execute(sql)
class CreateDbTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("create database db")
class DropDbTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
wt.execSql("drop database db")
class CreateTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tIndex = self._dbState.addTable()
self.logDebug("Creating a table {} ...".format(tIndex))
wt.execSql("create table db.table_{} (ts timestamp, speed int)".format(tIndex))
self.logDebug("Table {} created.".format(tIndex))
self._dbState.releaseTable(tIndex)
class CreateFixedTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
wt.execSql("create table db.{} (ts timestamp, speed int)".format(tblName))
class ReadFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
self._numRows = wt.querySql("select * from db.{}".format(tblName)) # save the result for later
# tdSql.query(" cars where tbname in ('carzero', 'carone')")
class DropTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tableName = self._dbState.getTableNameToDelete()
if ( not tableName ): # May be "False"
self.logInfo("Cannot generate a table to delete, skipping...")
return
self.logInfo("Dropping a table db.{} ...".format(tableName))
wt.execSql("drop table db.{}".format(tableName))
class DropFixedTableTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
tblName = self._dbState.getFixedTableName()
wt.execSql("drop table db.{}".format(tblName))
class AddDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
self.logInfo("Adding some data... numQueue={}".format(ds.tableNumQueue.toText()))
tIndex = ds.pickAndAllocateTable()
if ( tIndex == None ):
self.logInfo("No table found to add data, skipping...")
return
sql = "insert into db.table_{} values ('{}', {});".format(tIndex, ds.getNextTick(), ds.getNextInt())
self.logDebug("Executing SQL: {}".format(sql))
wt.execSql(sql)
ds.releaseTable(tIndex)
self.logDebug("Finished adding data")
class AddFixedDataTask(Task):
def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
ds = self._dbState
sql = "insert into db.table_{} values ('{}', {});".format(ds.getFixedTableName(), ds.getNextTick(), ds.getNextInt())
wt.execSql(sql)
# Deterministic random number generator
class Dice():
seeded = False # static, uninitialized
@classmethod
def seed(cls, s): # static
if (cls.seeded):
raise RuntimeError("Cannot seed the random generator more than once")
cls.verifyRNG()
random.seed(s)
cls.seeded = True # TODO: protect against multi-threading
@classmethod
def verifyRNG(cls): # Verify that the RNG is determinstic
random.seed(0)
x1 = random.randrange(0, 1000)
x2 = random.randrange(0, 1000)
x3 = random.randrange(0, 1000)
if ( x1 != 864 or x2!=394 or x3!=776 ):
raise RuntimeError("System RNG is not deterministic")
@classmethod
def throw(cls, stop): # get 0 to stop-1
return cls.throwRange(0, stop)
@classmethod
def throwRange(cls, start, stop): # up to stop-1
if ( not cls.seeded ):
raise RuntimeError("Cannot throw dice before seeding it")
return random.randrange(start, stop)
# Anyone needing to carry out work should simply come here
class WorkDispatcher():
def __init__(self, dbState):
# self.totalNumMethods = 2
self.tasks = [
CreateTableTask(dbState),
DropTableTask(dbState),
AddDataTask(dbState),
]
def throwDice(self):
max = len(self.tasks) - 1
dRes = random.randint(0, max)
# logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
return dRes
def pickTask(self):
dice = self.throwDice()
return self.tasks[dice]
def doWork(self, workerThread):
task = self.pickTask()
task.execute(workerThread)
def main():
# Super cool Python argument library: https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\
TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
---------------------------------------------------------------------
1. You build TDengine in the top level ./build directory, as described in offical docs
2. You run the server there before this script: ./build/bin/taosd -c test/cfg
'''))
parser.add_argument('-p', '--per-thread-db-connection', action='store_true',
help='Use a single shared db connection (default: false)')
parser.add_argument('-d', '--debug', action='store_true',
help='Turn on DEBUG mode for more logging (default: false)')
parser.add_argument('-s', '--max-steps', action='store', default=100, type=int,
help='Maximum number of steps to run (default: 100)')
parser.add_argument('-t', '--num-threads', action='store', default=10, type=int,
help='Number of threads to run (default: 10)')
global gConfig
gConfig = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
sys.exit()
global logger
logger = logging.getLogger('myApp')
if ( gConfig.debug ):
logger.setLevel(logging.DEBUG) # default seems to be INFO
ch = logging.StreamHandler()
logger.addHandler(ch)
dbState = DbState()
Dice.seed(0) # initial seeding of dice
tc = ThreadCoordinator(
ThreadPool(dbState, gConfig.num_threads, gConfig.max_steps, 0),
WorkDispatcher(dbState),
dbState
)
tc.run()
dbState.cleanUp()
logger.info("Finished running thread pool")
if __name__ == "__main__":
main()

View File

@ -1,41 +0,0 @@
#!/bin/bash
# This is the script for us to try to cause the TDengine server or client to crash
#
# PREPARATION
#
# 1. Build an compile the TDengine source code that comes with this script, in the same directory tree
# 2. Please follow the direction in our README.md, and build TDengine in the build/ directory
# 3. Adjust the configuration file if needed under build/test/cfg/taos.cfg
# 4. Run the TDengine server instance: cd build; ./build/bin/taosd -c test/cfg
# 5. Make sure you have a working Python3 environment: run /usr/bin/python3 --version, and you should get 3.6 or above
# 6. Make sure you have the proper Python packages: # sudo apt install python3-setuptools python3-pip python3-distutils
#
# RUNNING THIS SCRIPT
#
# This script assumes the source code directory is intact, and that the binaries has been built in the
# build/ directory, as such, will will load the Python libraries in the directory tree, and also load
# the TDengine client shared library (so) file, in the build/directory, as evidenced in the env
# variables below.
#
# Running the script is simple, no parameter is needed (for now, but will change in the future).
#
# Happy Crashing...
# Due to the heavy path name assumptions/usage, let us require that the user be in the current directory
EXEC_DIR=`dirname "$0"`
if [[ $EXEC_DIR != "." ]]
then
echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)"
exit -1
fi
# First we need to set up a path for Python to find our own TAOS modules, so that "import" can work.
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
# Then let us set up the library path so that our compiled SO file can be loaded by Python
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$(pwd)/../../build/build/lib
# Now we are all let, and let's see if we can find a crash. Note we pass all params
./crash_gen_0519.py $@

View File

@ -0,0 +1,213 @@
#unsupport run general/alter/cached_schema_after_alter.sim
#unsupport run general/alter/count.sim
#unsupport run general/alter/import.sim
#unsupport run general/alter/insert1.sim
#unsupport run general/alter/insert2.sim
#unsupport run general/alter/metrics.sim
#unsupport run general/alter/table.sim
run general/cache/new_metrics.sim
run general/cache/restart_metrics.sim
run general/cache/restart_table.sim
run general/connection/connection.sim
run general/column/commit.sim
run general/column/metrics.sim
run general/column/table.sim
run general/compress/commitlog.sim
run general/compress/compress.sim
run general/compress/compress2.sim
run general/compress/uncompress.sim
run general/compute/avg.sim
run general/compute/bottom.sim
run general/compute/count.sim
run general/compute/diff.sim
run general/compute/diff2.sim
run general/compute/first.sim
run general/compute/interval.sim
run general/compute/last.sim
run general/compute/leastsquare.sim
run general/compute/max.sim
run general/compute/min.sim
run general/compute/null.sim
run general/compute/percentile.sim
run general/compute/stddev.sim
run general/compute/sum.sim
run general/compute/top.sim
run general/db/alter_option.sim
run general/db/alter_tables_d2.sim
run general/db/alter_tables_v1.sim
run general/db/alter_tables_v4.sim
run general/db/alter_vgroups.sim
run general/db/basic.sim
run general/db/basic1.sim
run general/db/basic2.sim
run general/db/basic3.sim
run general/db/basic4.sim
run general/db/basic5.sim
run general/db/delete_reuse1.sim
run general/db/delete_reuse2.sim
run general/db/delete_reusevnode.sim
run general/db/delete_reusevnode2.sim
run general/db/delete_writing1.sim
run general/db/delete_writing2.sim
run general/db/delete.sim
run general/db/len.sim
run general/db/repeat.sim
run general/db/tables.sim
run general/db/vnodes.sim
run general/field/2.sim
run general/field/3.sim
run general/field/4.sim
run general/field/5.sim
run general/field/6.sim
run general/field/bigint.sim
run general/field/binary.sim
run general/field/bool.sim
run general/field/single.sim
run general/field/smallint.sim
run general/field/tinyint.sim
run general/http/restful.sim
run general/http/restful_insert.sim
run general/http/restful_limit.sim
run general/http/restful_full.sim
run general/http/prepare.sim
run general/http/telegraf.sim
run general/http/grafana_bug.sim
run general/http/grafana.sim
run general/import/basic.sim
run general/import/commit.sim
run general/import/large.sim
run general/import/replica1.sim
run general/insert/basic.sim
run general/insert/insert_drop.sim
run general/insert/query_block1_memory.sim
run general/insert/query_block2_memory.sim
run general/insert/query_block1_file.sim
run general/insert/query_block2_file.sim
run general/insert/query_file_memory.sim
run general/insert/query_multi_file.sim
run general/insert/tcp.sim
#unsupport run general/parser/alter.sim
#unsupport run general/parser/alter1.sim
#unsupport run general/parser/alter_stable.sim
run general/parser/auto_create_tb.sim
run general/parser/auto_create_tb_drop_tb.sim
run general/parser/col_arithmetic_operation.sim
run general/parser/columnValue.sim
run general/parser/commit.sim
run general/parser/create_db.sim
run general/parser/create_mt.sim
run general/parser/create_tb.sim
run general/parser/dbtbnameValidate.sim
run general/parser/import_commit1.sim
run general/parser/import_commit2.sim
run general/parser/import_commit3.sim
run general/parser/insert_tb.sim
run general/parser/first_last.sim
#unsupport run general/parser/import_file.sim
run general/parser/lastrow.sim
run general/parser/nchar.sim
#unsupport run general/parser/null_char.sim
run general/parser/single_row_in_tb.sim
run general/parser/select_from_cache_disk.sim
run general/parser/limit.sim
run general/parser/limit1.sim
run general/parser/limit1_tblocks100.sim
run general/parser/mixed_blocks.sim
run general/parser/selectResNum.sim
run general/parser/select_across_vnodes.sim
run general/parser/slimit1.sim
run general/parser/tbnameIn.sim
run general/parser/binary_escapeCharacter.sim
run general/parser/projection_limit_offset.sim
run general/parser/limit2.sim
run general/parser/slimit.sim
run general/parser/fill.sim
run general/parser/fill_stb.sim
run general/parser/interp.sim
run general/parser/where.sim
#unsupport run general/parser/join.sim
#unsupport run general/parser/join_multivnode.sim
run general/parser/select_with_tags.sim
#unsupport run general/parser/groupby.sim
#unsupport run general/parser/bug.sim
#unsupport run general/parser/tags_dynamically_specifiy.sim
#unsupport run general/parser/set_tag_vals.sim
#unsupport run general/parser/repeatAlter.sim
#unsupport run general/parser/slimit_alter_tags.sim
#unsupport run general/parser/stream_on_sys.sim
#unsupport run general/parser/stream.sim
#unsupport run general/parser/repeatStream.sim
run general/stable/disk.sim
run general/stable/dnode3.sim
run general/stable/metrics.sim
run general/stable/values.sim
run general/stable/vnode3.sim
run general/table/autocreate.sim
run general/table/basic1.sim
run general/table/basic2.sim
run general/table/basic3.sim
run general/table/bigint.sim
run general/table/binary.sim
run general/table/bool.sim
run general/table/column_name.sim
run general/table/column_num.sim
run general/table/column_value.sim
run general/table/column2.sim
run general/table/date.sim
run general/table/db.table.sim
run general/table/delete_reuse1.sim
run general/table/delete_reuse2.sim
run general/table/delete_writing.sim
run general/table/describe.sim
run general/table/double.sim
run general/table/fill.sim
run general/table/float.sim
run general/table/int.sim
run general/table/limit.sim
run general/table/smallint.sim
run general/table/table_len.sim
run general/table/table.sim
run general/table/tinyint.sim
run general/table/vgroup.sim
run general/tag/3.sim
run general/tag/4.sim
run general/tag/5.sim
run general/tag/6.sim
#unsupport run general/tag/add.sim
run general/tag/bigint.sim
run general/tag/binary_binary.sim
run general/tag/binary.sim
run general/tag/bool_binary.sim
run general/tag/bool_int.sim
run general/tag/bool.sim
#unsupport run general/tag/change.sim
run general/tag/column.sim
#unsupport run general/tag/commit.sim
run general/tag/create.sim
#unsupport run general/tag/delete.sim
run general/tag/double.sim
run general/tag/filter.sim
run general/tag/float.sim
run general/tag/int_binary.sim
run general/tag/int_float.sim
run general/tag/int.sim
#unsupport run general/tag/set.sim
run general/tag/smallint.sim
run general/tag/tinyint.sim
run general/user/authority.sim
run general/user/monitor.sim
run general/user/pass_alter.sim
run general/user/pass_len.sim
run general/user/user_create.sim
run general/user/user_len.sim
run general/vector/metrics_field.sim
run general/vector/metrics_mix.sim
run general/vector/metrics_query.sim
run general/vector/metrics_tag.sim
run general/vector/metrics_time.sim
run general/vector/multi.sim
run general/vector/single.sim
run general/vector/table_field.sim
run general/vector/table_mix.sim
run general/vector/table_query.sim
run general/vector/table_time.sim

View File

@ -60,17 +60,10 @@ if $rows != 75 then
return -1
endi
sql select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where tbcol = '1' group by tgcol -x step13
return -1
step13:
sql select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where ts < now + 4m and tbcol = '1' group by tgcol -x step14
return -1
step14:
sql select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where tbcol = '1' interval(1d) group by tgcol -x step15
return -1
step15:
print select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where tbcol = '1'
sql_error select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where tbcol = '1' group by tgcol
sql_error select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where ts < now + 4m and tbcol = '1' group by tgcol
sql_error select count(tbcol), avg(tbcol), sum(tbcol), min(tbcol), max(tbcol), first(tbcol), last(tbcol) from $mt where tbcol = '1' interval(1d) group by tgcol
#can't filter binary fields

View File

@ -133,6 +133,7 @@ $limit = $rowNum
$offset = $limit / 2
sql select max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) fill(value, -1, -2) limit $limit offset $offset
if $rows != $limit then
print expect $limit, actual $rows
return -1
endi
if $data01 != 0 then
@ -147,6 +148,7 @@ $limit = $rowNum
$offset = $limit / 2
sql select max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) fill(linear) limit $limit offset $offset
if $rows != $limit then
print expect $limit, actual $rows
return -1
endi
if $data01 != 0 then
@ -182,6 +184,7 @@ $limit = $rowNum
$offset = $limit / 2
sql select max(c1), min(c2), sum(c3), avg(c4), stddev(c5), spread(c6), first(c7), last(c8), first(c9) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) fill(prev) limit $limit offset $offset
if $rows != $limit then
print expect $limit, actual: $rows
return -1
endi

View File

@ -40,45 +40,47 @@
#run general/parser/nchar.sim
#sleep 2000
##run general/parser/null_char.sim
sleep 2000
run general/parser/single_row_in_tb.sim
sleep 2000
run general/parser/select_from_cache_disk.sim
sleep 2000
run general/parser/selectResNum.sim
sleep 2000
run general/parser/mixed_blocks.sim
sleep 2000
run general/parser/limit1.sim
sleep 2000
run general/parser/limit.sim
sleep 2000
run general/parser/limit1_tblocks100.sim
sleep 2000
run general/parser/select_across_vnodes.sim
sleep 2000
run general/parser/slimit1.sim
sleep 2000
run general/parser/tbnameIn.sim
sleep 2000
#sleep 2000
#run general/parser/single_row_in_tb.sim
#sleep 2000
#run general/parser/select_from_cache_disk.sim
#sleep 2000
#run general/parser/selectResNum.sim
#sleep 2000
#run general/parser/mixed_blocks.sim
#sleep 2000
#run general/parser/limit1.sim
#sleep 2000
#run general/parser/limit.sim
#sleep 2000
#run general/parser/limit1_tblocks100.sim
#sleep 2000
#run general/parser/select_across_vnodes.sim
#sleep 2000
#run general/parser/slimit1.sim
#sleep 2000
#run general/parser/tbnameIn.sim
#sleep 2000
run general/parser/projection_limit_offset.sim
sleep 2000
run general/parser/limit2.sim
sleep 2000
run general/parser/fill.sim
sleep 2000
run general/parser/fill_stb.sim
sleep 2000
run general/parser/where.sim
sleep 2000
run general/parser/slimit.sim
sleep 2000
run general/parser/select_with_tags.sim
sleep 2000
run general/parser/interp.sim
#run general/parser/fill.sim
#sleep 2000
#run general/parser/fill_stb.sim
#sleep 2000
#run general/parser/where.sim
#sleep 2000
#run general/parser/slimit.sim
#sleep 2000
#run general/parser/select_with_tags.sim
#sleep 2000
#run general/parser/interp.sim
sleep 2000
run general/parser/tags_dynamically_specifiy.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/set_tag_vals.sim
@ -97,9 +99,6 @@ sleep 2000
run general/parser/join.sim
sleep 2000
run general/parser/join_multivnode.sim
sleep 2000
run general/parser/groupby.sim
sleep 2000
run general/parser/binary_escapeCharacter.sim
sleep 2000

View File

@ -0,0 +1,49 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
print ======================== create stable
sql create database d1
$x = 0
while $x < 128
$tb = d1.s . $x
sql create table $tb (ts timestamp, i int) tags (j int)
$x = $x + 1
endw
print ======================== show stables
sql show d1.stables
print num of stables is $rows
if $rows != 128 then
return -1
endi
print ======================== create table
$x = 0
while $x < 424
$tb = d1.t . $x
sql create table $tb using d1.s0 tags( $x )
$x = $x + 1
endw
print ======================== show stables
sql show d1.tables
print num of tables is $rows
if $rows != 424 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -155,6 +155,7 @@ cd ../../../debug; make
./test.sh -f general/stable/disk.sim
./test.sh -f general/stable/dnode3.sim
./test.sh -f general/stable/metrics.sim
./test.sh -f general/stable/show.sim
./test.sh -f general/stable/values.sim
./test.sh -f general/stable/vnode3.sim

View File

@ -41,10 +41,15 @@ else
fi
TAOS_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2`
fi
BUILD_DIR=$TAOS_DIR/$BIN_DIR
BUILD_DIR=$TAOS_DIR/$BIN_DIR/build
SIM_DIR=$TAOS_DIR/sim

View File

@ -44,10 +44,15 @@ else
fi
TAOS_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2`
fi
BUILD_DIR=$TAOS_DIR/$BIN_DIR
BUILD_DIR=$TAOS_DIR/$BIN_DIR/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME

View File

@ -39,10 +39,15 @@ else
fi
TAOS_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2`
fi
BUILD_DIR=$TAOS_DIR/$BIN_DIR
BUILD_DIR=$TAOS_DIR/$BIN_DIR/build
SIM_DIR=$TAOS_DIR/sim

View File

@ -50,10 +50,15 @@ else
fi
TAOS_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2`
fi
BUILD_DIR=$TAOS_DIR/$BIN_DIR
BUILD_DIR=$TAOS_DIR/$BIN_DIR/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/$NODE_NAME

View File

@ -47,10 +47,15 @@ else
fi
TAOS_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2`
fi
BUILD_DIR=$TAOS_DIR/$BIN_DIR
BUILD_DIR=$TAOS_DIR/$BIN_DIR/build
SIM_DIR=$TAOS_DIR/sim
NODE_DIR=$SIM_DIR/arbitrator

View File

@ -49,10 +49,15 @@ else
fi
TOP_DIR=`pwd`
TAOSD_DIR=`find . -name "taosd"|grep bin|head -n1`
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
if [[ "$TAOSD_DIR" == *"$IN_TDINTERNAL"* ]]; then
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2,3`
else
BIN_DIR=`find . -name "taosd"|grep bin|head -n1|cut -d '/' --fields=2`
fi
BUILD_DIR=$TOP_DIR/$BIN_DIR
BUILD_DIR=$TOP_DIR/$BIN_DIR/build
SIM_DIR=$TOP_DIR/sim

View File

@ -75,7 +75,7 @@ endi
system_content curl -u root:taosdata -d '[{"metric": "ab1234567890123456789012345678ab1234567890123456789012345678","timestamp": 1346846400,"value": 18,"tags": {"host": "web01","group1": "1","dc": "lga"}}]' 127.0.0.1:6020/opentsdb/db/put
print $system_content
if $system_content != @{"errors":[{"datapoint":{"metric":"ab1234567890123456789012345678ab1234567890123456789012345678","stable":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb","table":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb_lga_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","host":"web01"},"status":"error","code":-2147483389}}],"failed":1,"success":0,"affected_rows":0}@ then
if $system_content != @{"errors":[{"datapoint":{"metric":"ab1234567890123456789012345678ab1234567890123456789012345678","stable":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb","table":"ab1234567890123456789012345678ab1234567890123456789012345678_d_bbb_lga_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","host":"web01"},"status":"error","code":-2147482999}}],"failed":1,"success":0,"affected_rows":0}@ then
return -1
endi
@ -125,7 +125,7 @@ endi
system_content curl -u root:taosdata -d '[{"metric": "sys_cpu","timestamp": 1346846400,"value": 18,"tags": {"host": "web01","group1": "1","group1": "1","group1": "1","group1": "1","group1": "1","dc": "lga"}}]' 127.0.0.1:6020/opentsdb/db/put
print $system_content
if $system_content != @{"errors":[{"datapoint":{"metric":"sys_cpu","stable":"sys_cpu_d_bbbbbbb","table":"sys_cpu_d_bbbbbbb_lga_1_1_1_1_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","group1":"1","group1":"1","group1":"1","group1":"1","host":"web01"},"status":"error","code":-2147483445}}],"failed":1,"success":0,"affected_rows":0}@ then
if $system_content != @{"errors":[{"datapoint":{"metric":"sys_cpu","stable":"sys_cpu_d_bbbbbbb","table":"sys_cpu_d_bbbbbbb_lga_1_1_1_1_1_web01","timestamp":1346846400,"value":18.000000,"tags":{"dc":"lga","group1":"1","group1":"1","group1":"1","group1":"1","group1":"1","host":"web01"},"status":"error","code":-2147483135}}],"failed":1,"success":0,"affected_rows":0}@ then
return -1
endi

View File

@ -9,7 +9,11 @@ NC='\033[0m'
echo "### run TSIM script ###"
cd script
./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee out.log
if [ "$1" == "cron" ]; then
./test.sh -f fullGeneralSuite.sim 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee out.log
else
./test.sh -f basicSuite.sim 2>&1 | grep 'success\|failed\|fault' | grep -v 'default' | tee out.log
fi
totalSuccess=`grep 'success' out.log | wc -l`
totalBasic=`grep success out.log | grep Suite | wc -l`

View File

@ -122,10 +122,26 @@ void* taos_execute(void *param) {
int64_t timestamp = 1530374400000L;
sprintf(sql, "insert into db.t%d values(%ld, %d, %d, %d)", pThread->index, timestamp, 0, 0, 0);
void *result = taos_query(taos, sql);
if (result == NULL) printf("error , sql:%s\n", sql);
int affectrows = taos_affected_rows(result);
if (affectrows != 1) printf("affect rows:%d, sql:%s\n", affectrows, sql);
TAOS_RES *pSql = taos_query(taos, sql);
int code = taos_errno(pSql);
if (code != 0)
{
printf("error code:%d, sql:%s\n", code, sql);
taos_free_result(pSql);
taos_close(taos);
return NULL;
}
int affectrows = taos_affected_rows(taos);
if (affectrows != 1)
{
printf("affect rows:%d, sql:%s\n", affectrows, sql);
taos_free_result(pSql);
taos_close(taos);
return NULL;
}
taos_free_result(pSql);
pSql = NULL;
timestamp -= 1000;
@ -133,17 +149,34 @@ void* taos_execute(void *param) {
for (int i = 1; i < rowNum; ++i) {
sprintf(sql, "import into db.t%d values(%ld, %d, %d, %d)", pThread->index, timestamp, i, i, i);
void * result = taos_query(taos, sql);
if (result == NULL) printf("error , sql:%s\n", sql);
int affectrows = taos_affected_rows(result);
if (affectrows != 1) printf("affect rows:%d, sql:%s\n", affectrows, sql);
pSql = taos_query(taos, sql);
code = taos_errno(pSql);
if (code != 0)
{
printf("error code:%d, sql:%s\n", code, sql);
taos_free_result(pSql);
pSql = NULL;
taos_close(taos);
return NULL;
}
int affectrows = taos_affected_rows(taos);
if (affectrows != 1) {
printf("affect rows:%d, sql:%s\n", affectrows, sql);
taos_free_result(pSql);
pSql = NULL;
taos_close(taos);
}
total_affect_rows += affectrows;
taos_free_result(pSql);
pSql = NULL;
timestamp -= 1000;
}
printf("thread:%d run finished total_affect_rows:%d\n", pThread->index, total_affect_rows);
taos_close(taos);
return NULL;
}

View File

@ -640,16 +640,15 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
for (int attempt = 0; attempt < 3; ++attempt) {
simLogSql(rest);
pSql = taos_query(script->taos, rest);
ret = terrno;
ret = taos_errno(pSql);
if (ret == TSDB_CODE_TABLE_ALREADY_EXIST ||
ret == TSDB_CODE_DB_ALREADY_EXIST) {
if (ret == TSDB_CODE_TABLE_ALREADY_EXIST || ret == TSDB_CODE_DB_ALREADY_EXIST) {
simTrace("script:%s, taos:%p, %s success, ret:%d:%s", script->fileName, script->taos, rest, ret, tstrerror(ret));
ret = 0;
break;
} else if (ret != 0) {
simTrace("script:%s, taos:%p, %s failed, ret:%d:%s, error:%s",
script->fileName, script->taos, rest, ret, tstrerror(ret), taos_errstr(script->taos));
script->fileName, script->taos, rest, ret, tstrerror(ret), taos_errstr(pSql));
if (line->errorJump == SQL_JUMP_TRUE) {
script->linePos = line->jump;
@ -659,6 +658,8 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
} else {
break;
}
taos_free_result(pSql);
}
if (ret) {
@ -771,11 +772,11 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
}
}
taos_free_result(pSql);
} else {
numOfRows = taos_affected_rows(pSql);
}
taos_free_result(pSql);
sprintf(script->rows, "%d", numOfRows);
script->linePos++;
@ -922,8 +923,7 @@ bool simExecuteSqlErrorCmd(SScript *script, char *rest) {
}
else {
pSql = taos_query(script->taos, rest);
ret = terrno;
ret = taos_errno(pSql);
taos_free_result(pSql);
}