Merge branch 'develop' into test/jenkins

This commit is contained in:
liuyq-617 2021-04-09 18:30:39 +08:00
commit fa55a5f164
33 changed files with 6628 additions and 1285 deletions

24
Jenkinsfile vendored
View File

@ -46,13 +46,20 @@ def pre_test(){
''' '''
script { script {
if (env.CHANGE_TARGET == 'master') { if (env.CHANGE_TARGET == 'master') {
sh 'git checkout master' sh '''
cd ${WKC}
git checkout master
'''
} }
else { else {
sh 'git checkout develop' sh '''
cd ${WKC}
git checkout develop
'''
} }
} }
sh''' sh'''
cd ${WKC}
git pull >/dev/null git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD git checkout -qf FETCH_HEAD
@ -62,15 +69,22 @@ def pre_test(){
''' '''
script { script {
if (env.CHANGE_TARGET == 'master') { if (env.CHANGE_TARGET == 'master') {
sh 'git checkout master' sh '''
cd ${WK}
git checkout master
'''
} }
else { else {
sh 'git checkout develop' sh '''
cd ${WK}
git checkout develop
'''
} }
} }
sh ''' sh '''
git pull >/dev/null
cd ${WK} cd ${WK}
git pull >/dev/null
export TZ=Asia/Harbin export TZ=Asia/Harbin
date date
git clean -dfx git clean -dfx

View File

@ -96,6 +96,25 @@ typedef struct STableMetaInfo {
SArray *tagColList; // SArray<SColumn*>, involved tag columns SArray *tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo; } STableMetaInfo;
typedef struct SColumnIndex {
int16_t tableIndex;
int16_t columnIndex;
} SColumnIndex;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SColumn {
SColumnIndex colIndex;
int32_t numOfFilters;
SColumnFilterInfo *filterInfo;
} SColumn;
/* the structure for sql function in select clause */ /* the structure for sql function in select clause */
typedef struct SSqlExpr { typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
@ -109,32 +128,24 @@ typedef struct SSqlExpr {
tVariant param[3]; // parameters are not more than 3 tVariant param[3]; // parameters are not more than 3
int32_t offset; // sub result column value of arithmetic expression. int32_t offset; // sub result column value of arithmetic expression.
int16_t resColId; // result column id int16_t resColId; // result column id
SColumn *pFilter; // expr filter
} SSqlExpr; } SSqlExpr;
typedef struct SColumnIndex { typedef struct SExprFilter {
int16_t tableIndex; tSqlExpr *pExpr; //used for having parse
int16_t columnIndex; SSqlExpr *pSqlExpr;
} SColumnIndex; SArray *fp;
SColumn *pFilters; //having filter info
}SExprFilter;
typedef struct SInternalField { typedef struct SInternalField {
TAOS_FIELD field; TAOS_FIELD field;
bool visible; bool visible;
SExprInfo *pArithExprInfo; SExprInfo *pArithExprInfo;
SSqlExpr *pSqlExpr; SSqlExpr *pSqlExpr;
SExprFilter *pFieldFilters;
} SInternalField; } SInternalField;
typedef struct SFieldInfo {
int16_t numOfOutput; // number of column in result
TAOS_FIELD* final;
SArray *internalField; // SArray<SInternalField>
} SFieldInfo;
typedef struct SColumn {
SColumnIndex colIndex;
int32_t numOfFilters;
SColumnFilterInfo *filterInfo;
} SColumn;
typedef struct SCond { typedef struct SCond {
uint64_t uid; uint64_t uid;
int32_t len; // length of tag query condition data int32_t len; // length of tag query condition data
@ -243,6 +254,7 @@ typedef struct SQueryInfo {
int32_t round; // 0/1/.... int32_t round; // 0/1/....
int32_t bufLen; int32_t bufLen;
char* buf; char* buf;
int32_t havingFieldNum;
} SQueryInfo; } SQueryInfo;
typedef struct { typedef struct {

View File

@ -22,6 +22,7 @@
#include "tscUtil.h" #include "tscUtil.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "tsclient.h" #include "tsclient.h"
#include "qUtil.h"
typedef struct SCompareParam { typedef struct SCompareParam {
SLocalDataSource **pLocalData; SLocalDataSource **pLocalData;
@ -1243,6 +1244,76 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
return false; return false;
} }
bool doFilterFieldData(char *input, SExprFilter* pFieldFilters, int16_t type, bool* notSkipped) {
bool qualified = false;
for(int32_t k = 0; k < pFieldFilters->pFilters->numOfFilters; ++k) {
__filter_func_t fp = taosArrayGetP(pFieldFilters->fp, k);
SColumnFilterElem filterElem = {.filterInfo = pFieldFilters->pFilters->filterInfo[k]};
bool isnull = isNull(input, type);
if (isnull) {
if (fp == isNullOperator) {
qualified = true;
break;
} else {
continue;
}
} else {
if (fp == notNullOperator) {
qualified = true;
break;
} else if (fp == isNullOperator) {
continue;
}
}
if (fp(&filterElem, input, input, type)) {
qualified = true;
break;
}
}
*notSkipped = qualified;
return TSDB_CODE_SUCCESS;
}
int32_t doHavingFilter(SQueryInfo* pQueryInfo, tFilePage* pOutput, bool* notSkipped) {
*notSkipped = true;
if (pQueryInfo->havingFieldNum <= 0) {
return TSDB_CODE_SUCCESS;
}
//int32_t exprNum = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
size_t numOfOutput = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfOutput; ++i) {
SInternalField* pInterField = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, i);
SExprFilter* pFieldFilters = pInterField->pFieldFilters;
if (pFieldFilters == NULL) {
continue;
}
int32_t type = pInterField->field.type;
char* pInput = pOutput->data + pOutput->num* pFieldFilters->pSqlExpr->offset;
doFilterFieldData(pInput, pFieldFilters, type, notSkipped);
if (*notSkipped == false) {
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
/** /**
* *
* @param pSql * @param pSql
@ -1283,6 +1354,22 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren
doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize); doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalMerge->finalModel->rowSize);
} }
bool notSkipped = true;
doHavingFilter(pQueryInfo, pResBuf, &notSkipped);
if (!notSkipped) {
pRes->numOfRows = 0;
pLocalMerge->discard = !noMoreCurrentGroupRes;
if (pLocalMerge->discard) {
SColumnModel *pInternModel = pLocalMerge->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalMerge->discardData, pLocalMerge->pTempBuffer->data, 0, 1, 1);
}
return notSkipped;
}
// no interval query, no fill operation // no interval query, no fill operation
if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) { if (pQueryInfo->interval.interval == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo); genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo);

View File

@ -34,6 +34,7 @@
#include "tstoken.h" #include "tstoken.h"
#include "tstrbuild.h" #include "tstrbuild.h"
#include "ttokendef.h" #include "ttokendef.h"
#include "qUtil.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
@ -1097,6 +1098,7 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd) {
return true; return true;
} }
static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pCmd) { static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pCmd) {
assert(pTagsList != NULL); assert(pTagsList != NULL);
@ -1676,18 +1678,6 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, SArray* pSelectLis
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
} }
/*
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
*/
if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3065,6 +3055,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) { static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) {
if (pColumn == NULL) { if (pColumn == NULL) {
return NULL; return NULL;
@ -3088,15 +3079,11 @@ static SColumnFilterInfo* addColumnFilterInfo(SColumn* pColumn) {
} }
static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter, static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SColumnFilterInfo* pColumnFilter,
SColumnIndex* columnIndex, tSqlExpr* pExpr) { int16_t colType, tSqlExpr* pExpr) {
const char* msg = "not supported filter condition"; const char* msg = "not supported filter condition";
tSqlExpr* pRight = pExpr->pRight; tSqlExpr* pRight = pExpr->pRight;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, columnIndex->tableIndex);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, columnIndex->columnIndex);
int16_t colType = pSchema->type;
if (colType >= TSDB_DATA_TYPE_TINYINT && colType <= TSDB_DATA_TYPE_BIGINT) { if (colType >= TSDB_DATA_TYPE_TINYINT && colType <= TSDB_DATA_TYPE_BIGINT) {
colType = TSDB_DATA_TYPE_BIGINT; colType = TSDB_DATA_TYPE_BIGINT;
} else if (colType == TSDB_DATA_TYPE_FLOAT || colType == TSDB_DATA_TYPE_DOUBLE) { } else if (colType == TSDB_DATA_TYPE_FLOAT || colType == TSDB_DATA_TYPE_DOUBLE) {
@ -3301,7 +3288,10 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
} }
pColumn->colIndex = *pIndex; pColumn->colIndex = *pIndex;
return doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pIndex, pExpr);
int16_t colType = pSchema->type;
return doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, colType, pExpr);
} }
static int32_t getTablenameCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pTableCond, SStringBuilder* sb) { static int32_t getTablenameCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pTableCond, SStringBuilder* sb) {
@ -6030,7 +6020,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
if (TSDB_COL_IS_TAG(pColIndex->flag)) { if (TSDB_COL_IS_TAG(pColIndex->flag)) {
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex}; SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true); SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, (int32_t)size - pQueryInfo->havingFieldNum, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true);
memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName)); memset(pExpr->aliasName, 0, sizeof(pExpr->aliasName));
tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName)); tstrncpy(pExpr->aliasName, name, sizeof(pExpr->aliasName));
@ -6039,7 +6029,7 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
// NOTE: tag column does not add to source column list // NOTE: tag column does not add to source column list
SColumnList ids = getColumnList(1, 0, pColIndex->colIndex); SColumnList ids = getColumnList(1, 0, pColIndex->colIndex);
insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr); insertResultField(pQueryInfo, (int32_t)size - pQueryInfo->havingFieldNum, &ids, bytes, (int8_t)type, name, pExpr);
} else { } else {
// if this query is "group by" normal column, time window query is not allowed // if this query is "group by" normal column, time window query is not allowed
if (isTimeWindowQuery(pQueryInfo)) { if (isTimeWindowQuery(pQueryInfo)) {
@ -6166,7 +6156,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
} }
// projection query on super table does not compatible with "group by" syntax // projection query on super table does not compatible with "group by" syntax
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { if (tscIsProjectionQuery(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3); return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
} }
@ -6769,6 +6759,313 @@ static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tscInsertExprFields(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** interField) {
tSqlExprItem item = {.pNode = pExpr, .aliasName = NULL, .distinct = false};
int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
// ADD TRUE FOR TEST
if (addExprAndResultField(pCmd, pQueryInfo, outputIndex, &item, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
++pQueryInfo->havingFieldNum;
size_t n = tscSqlExprNumOfExprs(pQueryInfo);
SSqlExpr* pSqlExpr = tscSqlExprGet(pQueryInfo, (int32_t)n - 1);
int32_t slot = tscNumOfFields(pQueryInfo) - 1;
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, slot);
pInfo->visible = false;
if (pInfo->pFieldFilters == NULL) {
SExprFilter* pFieldFilters = calloc(1, sizeof(SExprFilter));
if (pFieldFilters == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SColumn* pFilters = calloc(1, sizeof(SColumn));
if (pFilters == NULL) {
tfree(pFieldFilters);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pFieldFilters->pFilters = pFilters;
pFieldFilters->pSqlExpr = pSqlExpr;
pSqlExpr->pFilter = pFilters;
pInfo->pFieldFilters = pFieldFilters;
}
pInfo->pFieldFilters->pExpr = pExpr;
*interField = pInfo;
return TSDB_CODE_SUCCESS;
}
int32_t tscGetExprFilters(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SInternalField** pField) {
SInternalField* pInfo = NULL;
for (int32_t i = pQueryInfo->havingFieldNum - 1; i >= 0; --i) {
pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, pQueryInfo->fieldsInfo.numOfOutput - 1 - i);
if (pInfo->pFieldFilters && 0 == tSqlExprCompare(pInfo->pFieldFilters->pExpr, pExpr)) {
*pField = pInfo;
return TSDB_CODE_SUCCESS;
}
}
int32_t ret = tscInsertExprFields(pCmd, pQueryInfo, pExpr, &pInfo);
if (ret) {
return ret;
}
*pField = pInfo;
return TSDB_CODE_SUCCESS;
}
static int32_t genExprFilter(SExprFilter * exprFilter) {
exprFilter->fp = taosArrayInit(4, sizeof(__filter_func_t));
if (exprFilter->fp == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < exprFilter->pFilters->numOfFilters; ++i) {
SColumnFilterInfo *filterInfo = &exprFilter->pFilters->filterInfo[i];
int32_t lower = filterInfo->lowerRelOptr;
int32_t upper = filterInfo->upperRelOptr;
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
tscError("invalid rel optr");
return TSDB_CODE_TSC_APP_ERROR;
}
__filter_func_t ffp = getFilterOperator(lower, upper);
if (ffp == NULL) {
tscError("invalid filter info");
return TSDB_CODE_TSC_APP_ERROR;
}
taosArrayPush(exprFilter->fp, &ffp);
}
return TSDB_CODE_SUCCESS;
}
static int32_t handleExprInHavingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t sqlOptr) {
const char* msg1 = "non binary column not support like operator";
const char* msg2 = "invalid operator for binary column in having clause";
const char* msg3 = "invalid operator for bool column in having clause";
SColumn* pColumn = NULL;
SColumnFilterInfo* pColFilter = NULL;
SInternalField* pInfo = NULL;
/*
* in case of TK_AND filter condition, we first find the corresponding column and build the query condition together
* the already existed condition.
*/
if (sqlOptr == TK_AND) {
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo);
if (ret) {
return ret;
}
pColumn = pInfo->pFieldFilters->pFilters;
// this is a new filter condition on this column
if (pColumn->numOfFilters == 0) {
pColFilter = addColumnFilterInfo(pColumn);
} else { // update the existed column filter information, find the filter info here
pColFilter = &pColumn->filterInfo[0];
}
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else if (sqlOptr == TK_OR) {
int32_t ret = tscGetExprFilters(pCmd, pQueryInfo, pExpr->pLeft, &pInfo);
if (ret) {
return ret;
}
pColumn = pInfo->pFieldFilters->pFilters;
// TODO fixme: failed to invalid the filter expression: "col1 = 1 OR col2 = 2"
pColFilter = addColumnFilterInfo(pColumn);
if (pColFilter == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
} else { // error;
return TSDB_CODE_TSC_INVALID_SQL;
}
pColFilter->filterstr =
((pInfo->field.type == TSDB_DATA_TYPE_BINARY || pInfo->field.type == TSDB_DATA_TYPE_NCHAR) ? 1 : 0);
if (pColFilter->filterstr) {
if (pExpr->tokenId != TK_EQ
&& pExpr->tokenId != TK_NE
&& pExpr->tokenId != TK_ISNULL
&& pExpr->tokenId != TK_NOTNULL
&& pExpr->tokenId != TK_LIKE
) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
} else {
if (pExpr->tokenId == TK_LIKE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pInfo->field.type == TSDB_DATA_TYPE_BOOL) {
if (pExpr->tokenId != TK_EQ && pExpr->tokenId != TK_NE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
}
int32_t ret = doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pInfo->field.type, pExpr);
if (ret) {
return ret;
}
return genExprFilter(pInfo->pFieldFilters);
}
int32_t getHavingExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr* pExpr, int32_t parentOptr) {
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
const char* msg1 = "invalid having clause";
tSqlExpr* pLeft = pExpr->pLeft;
tSqlExpr* pRight = pExpr->pRight;
if (pExpr->tokenId == TK_AND || pExpr->tokenId == TK_OR) {
int32_t ret = getHavingExpr(pCmd, pQueryInfo, pExpr->pLeft, pExpr->tokenId);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return getHavingExpr(pCmd, pQueryInfo, pExpr->pRight, pExpr->tokenId);
}
if (pLeft == NULL || pRight == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pLeft->type == pRight->type) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
exchangeExpr(pExpr);
pLeft = pExpr->pLeft;
pRight = pExpr->pRight;
if (pLeft->type != SQL_NODE_SQLFUNCTION) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pRight->type != SQL_NODE_VALUE) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pExpr->tokenId >= TK_BITAND) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
//if (pLeft->pParam == NULL || pLeft->pParam->nExpr < 1) {
// return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
//}
if (pLeft->pParam) {
size_t size = taosArrayGetSize(pLeft->pParam);
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pParamElem = taosArrayGet(pLeft->pParam, i);
if (pParamElem->pNode->tokenId != TK_ALL &&
pParamElem->pNode->tokenId != TK_ID &&
pParamElem->pNode->tokenId != TK_STRING &&
pParamElem->pNode->tokenId != TK_INTEGER &&
pParamElem->pNode->tokenId != TK_FLOAT) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pParamElem->pNode->tokenId == TK_ID && (pParamElem->pNode->colInfo.z == NULL && pParamElem->pNode->colInfo.n == 0)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pParamElem->pNode->tokenId == TK_ID) {
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if ((getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
if (index.columnIndex <= 0 ||
index.columnIndex >= tscGetNumOfColumns(pTableMeta)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
}
}
}
pLeft->functionId = isValidFunction(pLeft->operand.z, pLeft->operand.n);
if (pLeft->functionId < 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
return handleExprInHavingClause(pCmd, pQueryInfo, pExpr, parentOptr);
}
int32_t parseHavingClause(SQueryInfo* pQueryInfo, tSqlExpr* pExpr, SSqlCmd* pCmd, bool isSTable, int32_t joinQuery, int32_t timeWindowQuery) {
const char* msg1 = "having only works with group by";
const char* msg2 = "functions or others can not be mixed up";
const char* msg3 = "invalid expression in having clause";
if (pExpr == NULL) {
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->groupbyExpr.numOfGroupCols <= 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pExpr->pLeft == NULL || pExpr->pRight == NULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
int32_t ret = 0;
if ((ret = getHavingExpr(pCmd, pQueryInfo, pExpr, TK_AND)) != TSDB_CODE_SUCCESS) {
return ret;
}
//REDO function check
if (!functionCompatibleCheck(pQueryInfo, joinQuery, timeWindowQuery)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
return TSDB_CODE_SUCCESS;
}
int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) { int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t index) {
assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0)); assert(pQuerySqlNode != NULL && (pQuerySqlNode->from == NULL || taosArrayGetSize(pQuerySqlNode->from->tableList) > 0));
@ -6934,6 +7231,23 @@ int32_t doValidateSqlNode(SSqlObj* pSql, SQuerySqlNode* pQuerySqlNode, int32_t i
} }
} }
// parse the having clause in the first place
if (parseHavingClause(pQueryInfo, pQuerySqlNode->pHaving, pCmd, isSTable, joinQuery, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
}
/*
* transfer sql functions that need secondary merge into another format
* in dealing with super table queries such as: count/first/last
*/
if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
return TSDB_CODE_TSC_INVALID_SQL;
}
}
if (parseSessionClause(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) { if (parseSessionClause(pCmd, pQueryInfo, pQuerySqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL; return TSDB_CODE_TSC_INVALID_SQL;
} }
@ -7125,3 +7439,10 @@ bool hasNormalColumnFilter(SQueryInfo* pQueryInfo) {
return false; return false;
} }

View File

@ -862,8 +862,44 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pSqlFuncExpr->functionId = htons(pExpr->functionId); pSqlFuncExpr->functionId = htons(pExpr->functionId);
pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams); pSqlFuncExpr->numOfParams = htons(pExpr->numOfParams);
pSqlFuncExpr->resColId = htons(pExpr->resColId); pSqlFuncExpr->resColId = htons(pExpr->resColId);
if (pTableMeta->tableType != TSDB_SUPER_TABLE && pExpr->pFilter && pExpr->pFilter->numOfFilters > 0) {
pSqlFuncExpr->filterNum = htonl(pExpr->pFilter->numOfFilters);
} else {
pSqlFuncExpr->filterNum = 0;
}
pMsg += sizeof(SSqlFuncMsg); pMsg += sizeof(SSqlFuncMsg);
if (pSqlFuncExpr->filterNum) {
pMsg += sizeof(SColumnFilterInfo) * pExpr->pFilter->numOfFilters;
// append the filter information after the basic column information
for (int32_t f = 0; f < pExpr->pFilter->numOfFilters; ++f) {
SColumnFilterInfo *pColFilter = &pExpr->pFilter->filterInfo[f];
SColumnFilterInfo *pFilterMsg = &pSqlFuncExpr->filterInfo[f];
pFilterMsg->filterstr = htons(pColFilter->filterstr);
if (pColFilter->filterstr) {
pFilterMsg->len = htobe64(pColFilter->len);
memcpy(pMsg, (void *)pColFilter->pz, (size_t)(pColFilter->len + 1));
pMsg += (pColFilter->len + 1); // append the additional filter binary info
} else {
pFilterMsg->lowerBndi = htobe64(pColFilter->lowerBndi);
pFilterMsg->upperBndi = htobe64(pColFilter->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pColFilter->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pColFilter->upperRelOptr);
if (pColFilter->lowerRelOptr == TSDB_RELATION_INVALID && pColFilter->upperRelOptr == TSDB_RELATION_INVALID) {
tscError("invalid filter info");
return TSDB_CODE_TSC_INVALID_SQL;
}
}
}
for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log for (int32_t j = 0; j < pExpr->numOfParams; ++j) { // todo add log
pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType); pSqlFuncExpr->arg[j].argType = htons((uint16_t)pExpr->param[j].nType);
pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen); pSqlFuncExpr->arg[j].argBytes = htons(pExpr->param[j].nLen);

View File

@ -1045,6 +1045,7 @@ SInternalField* tscFieldInfoAppend(SFieldInfo* pFieldInfo, TAOS_FIELD* pField) {
.pSqlExpr = NULL, .pSqlExpr = NULL,
.pArithExprInfo = NULL, .pArithExprInfo = NULL,
.visible = true, .visible = true,
.pFieldFilters = NULL,
}; };
info.field = *pField; info.field = *pField;
@ -1057,6 +1058,7 @@ SInternalField* tscFieldInfoInsert(SFieldInfo* pFieldInfo, int32_t index, TAOS_F
.pSqlExpr = NULL, .pSqlExpr = NULL,
.pArithExprInfo = NULL, .pArithExprInfo = NULL,
.visible = true, .visible = true,
.pFieldFilters = NULL,
}; };
info.field = *field; info.field = *field;
@ -1130,6 +1132,22 @@ int32_t tscGetResRowLength(SArray* pExprList) {
return size; return size;
} }
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) {
tfree(pFilterInfo[i].pz);
}
}
tfree(pFilterInfo);
}
static void tscColumnDestroy(SColumn* pCol) {
destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
free(pCol);
}
void tscFieldInfoClear(SFieldInfo* pFieldInfo) { void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
if (pFieldInfo == NULL) { if (pFieldInfo == NULL) {
return; return;
@ -1150,6 +1168,11 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
tfree(pInfo->pArithExprInfo); tfree(pInfo->pArithExprInfo);
} }
if (pInfo->pFieldFilters != NULL) {
tscColumnDestroy(pInfo->pFieldFilters->pFilters);
tfree(pInfo->pFieldFilters);
}
} }
taosArrayDestroy(pFieldInfo->internalField); taosArrayDestroy(pFieldInfo->internalField);
@ -1411,15 +1434,7 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex) {
return taosArrayGetP(pColumnList, i); return taosArrayGetP(pColumnList, i);
} }
static void destroyFilterInfo(SColumnFilterInfo* pFilterInfo, int32_t numOfFilters) {
for(int32_t i = 0; i < numOfFilters; ++i) {
if (pFilterInfo[i].filterstr) {
tfree(pFilterInfo[i].pz);
}
}
tfree(pFilterInfo);
}
SColumn* tscColumnClone(const SColumn* src) { SColumn* tscColumnClone(const SColumn* src) {
assert(src != NULL); assert(src != NULL);
@ -1436,10 +1451,6 @@ SColumn* tscColumnClone(const SColumn* src) {
return dst; return dst;
} }
static void tscColumnDestroy(SColumn* pCol) {
destroyFilterInfo(pCol->filterInfo, pCol->numOfFilters);
free(pCol);
}
void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) { void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) {
assert(src != NULL && dst != NULL); assert(src != NULL && dst != NULL);

View File

@ -399,36 +399,6 @@ typedef struct SColIndex {
char name[TSDB_COL_NAME_LEN]; // TODO remove it char name[TSDB_COL_NAME_LEN]; // TODO remove it
} SColIndex; } SColIndex;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
} SSqlFuncMsg;
typedef struct SExprInfo {
SSqlFuncMsg base;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int32_t interBytes;
int64_t uid;
} SExprInfo;
typedef struct SColumnFilterInfo { typedef struct SColumnFilterInfo {
int16_t lowerRelOptr; int16_t lowerRelOptr;
@ -451,6 +421,42 @@ typedef struct SColumnFilterInfo {
}; };
} SColumnFilterInfo; } SColumnFilterInfo;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncMsg {
int16_t functionId;
int16_t numOfParams;
int16_t resColId; // result column id, id of the current output column
int16_t colType;
int16_t colBytes;
SColIndex colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
int32_t filterNum;
SColumnFilterInfo filterInfo[];
} SSqlFuncMsg;
typedef struct SExprInfo {
SColumnFilterInfo * pFilter;
struct tExprNode* pExpr;
int16_t bytes;
int16_t type;
int32_t interBytes;
int64_t uid;
SSqlFuncMsg base;
} SExprInfo;
/* /*
* for client side struct, we only need the column id, type, bytes are not necessary * for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information. * But for data in vnode side, we need all the following information.

View File

@ -205,6 +205,11 @@
#define TK_VALUES 186 #define TK_VALUES 186
#define TK_SPACE 300 #define TK_SPACE 300
#define TK_COMMENT 301 #define TK_COMMENT 301
#define TK_ILLEGAL 302 #define TK_ILLEGAL 302

View File

@ -27,7 +27,7 @@
#define MAX_IP_SIZE 20 #define MAX_IP_SIZE 20
#define MAX_PASSWORD_SIZE 20 #define MAX_PASSWORD_SIZE 20
#define MAX_HISTORY_SIZE 1000 #define MAX_HISTORY_SIZE 1000
#define MAX_COMMAND_SIZE 65536 #define MAX_COMMAND_SIZE 1048586
#define HISTORY_FILE ".taos_history" #define HISTORY_FILE ".taos_history"
#define DEFAULT_RES_SHOW_NUM 100 #define DEFAULT_RES_SHOW_NUM 100

View File

@ -238,7 +238,7 @@ void resetCommand(Command *cmd, const char s[]) {
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size); clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
memset(cmd->buffer, 0, MAX_COMMAND_SIZE); memset(cmd->buffer, 0, MAX_COMMAND_SIZE);
memset(cmd->command, 0, MAX_COMMAND_SIZE); memset(cmd->command, 0, MAX_COMMAND_SIZE);
strcpy(cmd->command, s); strncpy(cmd->command, s, MAX_COMMAND_SIZE);
int size = 0; int size = 0;
int width = 0; int width = 0;
getMbSizeInfo(s, &size, &width); getMbSizeInfo(s, &size, &width);

View File

@ -67,6 +67,12 @@ enum TEST_MODE {
INVAID_TEST INVAID_TEST
}; };
enum QUERY_MODE {
SYNC_QUERY_MODE, // 0
ASYNC_QUERY_MODE, // 1
INVALID_MODE
};
#define MAX_SQL_SIZE 65536 #define MAX_SQL_SIZE 65536
#define BUFFER_SIZE (65536*2) #define BUFFER_SIZE (65536*2)
#define MAX_USERNAME_SIZE 64 #define MAX_USERNAME_SIZE 64
@ -198,7 +204,7 @@ typedef struct SArguments_S {
bool verbose_print; bool verbose_print;
bool performance_print; bool performance_print;
char * output_file; char * output_file;
int mode; int query_mode;
char * datatype[MAX_NUM_DATATYPE + 1]; char * datatype[MAX_NUM_DATATYPE + 1];
int len_of_binary; int len_of_binary;
int num_of_CPR; int num_of_CPR;
@ -351,7 +357,7 @@ typedef struct SpecifiedQueryInfo_S {
int rate; // 0: unlimit > 0 loop/s int rate; // 0: unlimit > 0 loop/s
int concurrent; int concurrent;
int sqlCount; int sqlCount;
int subscribeMode; // 0: sync, 1: async int mode; // 0: sync, 1: async
int subscribeInterval; // ms int subscribeInterval; // ms
int queryTimes; int queryTimes;
int subscribeRestart; int subscribeRestart;
@ -365,7 +371,7 @@ typedef struct SuperQueryInfo_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
int rate; // 0: unlimit > 0 loop/s int rate; // 0: unlimit > 0 loop/s
int threadCnt; int threadCnt;
int subscribeMode; // 0: sync, 1: async int mode; // 0: sync, 1: async
int subscribeInterval; // ms int subscribeInterval; // ms
int subscribeRestart; int subscribeRestart;
int subscribeKeepProgress; int subscribeKeepProgress;
@ -429,6 +435,8 @@ typedef struct SThreadInfo_S {
int64_t maxDelay; int64_t maxDelay;
int64_t minDelay; int64_t minDelay;
// query
int querySeq; // sequence number of sql command
} threadInfo; } threadInfo;
#ifdef WINDOWS #ifdef WINDOWS
@ -714,7 +722,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
} else if (strcmp(argv[i], "-s") == 0) { } else if (strcmp(argv[i], "-s") == 0) {
arguments->sqlFile = argv[++i]; arguments->sqlFile = argv[++i];
} else if (strcmp(argv[i], "-q") == 0) { } else if (strcmp(argv[i], "-q") == 0) {
arguments->mode = atoi(argv[++i]); arguments->query_mode = atoi(argv[++i]);
} else if (strcmp(argv[i], "-T") == 0) { } else if (strcmp(argv[i], "-T") == 0) {
arguments->num_of_threads = atoi(argv[++i]); arguments->num_of_threads = atoi(argv[++i]);
} else if (strcmp(argv[i], "-i") == 0) { } else if (strcmp(argv[i], "-i") == 0) {
@ -986,7 +994,8 @@ static void getResult(TAOS_RES *res, char* resultFileName) {
static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) { static void selectAndGetResult(TAOS *taos, char *command, char* resultFileName) {
TAOS_RES *res = taos_query(taos, command); TAOS_RES *res = taos_query(taos, command);
if (res == NULL || taos_errno(res) != 0) { if (res == NULL || taos_errno(res) != 0) {
printf("failed to sql:%s, reason:%s\n", command, taos_errstr(res)); errorPrint("%s() LN%d, failed to execute sql:%s, reason:%s\n",
__func__, __LINE__, command, taos_errstr(res));
taos_free_result(res); taos_free_result(res);
return; return;
} }
@ -1163,7 +1172,8 @@ static int printfInsertMeta() {
if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) {
if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2))
|| (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) {
printf(" precision: \033[33m%s\033[0m\n", g_Dbs.db[i].dbCfg.precision); printf(" precision: \033[33m%s\033[0m\n",
g_Dbs.db[i].dbCfg.precision);
} else { } else {
printf("\033[1m\033[40;31m precision error: %s\033[0m\n", printf("\033[1m\033[40;31m precision error: %s\033[0m\n",
g_Dbs.db[i].dbCfg.precision); g_Dbs.db[i].dbCfg.precision);
@ -1171,11 +1181,13 @@ static int printfInsertMeta() {
} }
} }
printf(" super table count: \033[33m%d\033[0m\n", g_Dbs.db[i].superTblCount); printf(" super table count: \033[33m%d\033[0m\n",
g_Dbs.db[i].superTblCount);
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
printf(" super table[\033[33m%d\033[0m]:\n", j); printf(" super table[\033[33m%d\033[0m]:\n", j);
printf(" stbName: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sTblName); printf(" stbName: \033[33m%s\033[0m\n",
g_Dbs.db[i].superTbls[j].sTblName);
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
@ -1459,41 +1471,61 @@ static void printfQueryMeta() {
printf("\n"); printf("\n");
printf("specified table query info: \n"); printf("specified table query info: \n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.rate); printf("query interval: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.rate);
printf("top query times:\033[33m%d\033[0m\n", g_args.query_times); printf("top query times:\033[33m%d\033[0m\n", g_args.query_times);
printf("concurrent: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.concurrent); printf("concurrent: \033[33m%d\033[0m\n",
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.sqlCount); g_queryInfo.specifiedQueryInfo.concurrent);
printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.sqlCount);
printf("specified tbl query times:\n"); printf("specified tbl query times:\n");
printf(" \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.queryTimes); printf(" \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.queryTimes);
if (SUBSCRIBE_TEST == g_args.test_mode) { if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeMode); printf("mod: \033[33m%d\033[0m\n",
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.mode);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeRestart); printf("interval: \033[33m%d\033[0m\n",
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.specifiedQueryInfo.sql[i]); printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.specifiedQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
printf("super table query info:\n"); printf("super table query info:\n");
printf("query interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.rate); printf("query interval: \033[33m%d\033[0m\n",
printf("threadCnt: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.threadCnt); g_queryInfo.superQueryInfo.rate);
printf("childTblCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.childTblCount); printf("threadCnt: \033[33m%d\033[0m\n",
printf("stable name: \033[33m%s\033[0m\n", g_queryInfo.superQueryInfo.sTblName); g_queryInfo.superQueryInfo.threadCnt);
printf("stb query times:\033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.queryTimes); printf("childTblCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.childTblCount);
printf("stable name: \033[33m%s\033[0m\n",
g_queryInfo.superQueryInfo.sTblName);
printf("stb query times:\033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.queryTimes);
if (SUBSCRIBE_TEST == g_args.test_mode) { if (SUBSCRIBE_TEST == g_args.test_mode) {
printf("mod: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeMode); printf("mod: \033[33m%d\033[0m\n",
printf("interval: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.mode);
printf("restart: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeRestart); printf("interval: \033[33m%d\033[0m\n",
printf("keepProgress: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeInterval);
printf("restart: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeRestart);
printf("keepProgress: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress);
} }
printf("sqlCount: \033[33m%d\033[0m\n", g_queryInfo.superQueryInfo.sqlCount); printf("sqlCount: \033[33m%d\033[0m\n",
g_queryInfo.superQueryInfo.sqlCount);
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
printf(" sql[%d]: \033[33m%s\033[0m\n", i, g_queryInfo.superQueryInfo.sql[i]); printf(" sql[%d]: \033[33m%s\033[0m\n",
i, g_queryInfo.superQueryInfo.sql[i]);
} }
printf("\n"); printf("\n");
@ -1670,7 +1702,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]); dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]); dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX])); dbInfos[count]->cachelast =
(int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
tstrncpy(dbInfos[count]->precision, tstrncpy(dbInfos[count]->precision,
(char *)row[TSDB_SHOW_DB_PRECISION_INDEX], (char *)row[TSDB_SHOW_DB_PRECISION_INDEX],
@ -1681,7 +1714,8 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
count++; count++;
if (count > MAX_DATABASE_COUNT) { if (count > MAX_DATABASE_COUNT) {
errorPrint( "The database count overflow than %d\n", MAX_DATABASE_COUNT); errorPrint("%s() LN%d, The database count overflow than %d\n",
__func__, __LINE__, MAX_DATABASE_COUNT);
break; break;
} }
} }
@ -1691,6 +1725,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
static void printfDbInfoForQueryToFile( static void printfDbInfoForQueryToFile(
char* filename, SDbInfo* dbInfos, int index) { char* filename, SDbInfo* dbInfos, int index) {
if (filename[0] == 0) if (filename[0] == 0)
return; return;
@ -1951,7 +1986,8 @@ static int postProceSql(char* host, uint16_t port, char* sqlstr)
static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) { static char* getTagValueFromTagSample(SSuperTable* stbInfo, int tagUsePos) {
char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1); char* dataBuf = (char*)calloc(TSDB_MAX_SQL_LEN+1, 1);
if (NULL == dataBuf) { if (NULL == dataBuf) {
errorPrint("%s() LN%d, calloc failed! size:%d\n", __func__, __LINE__, TSDB_MAX_SQL_LEN+1); errorPrint("%s() LN%d, calloc failed! size:%d\n",
__func__, __LINE__, TSDB_MAX_SQL_LEN+1);
return NULL; return NULL;
} }
@ -2573,10 +2609,7 @@ static void* createTable(void *sarg)
int64_t lastPrintTime = taosGetTimestampMs(); int64_t lastPrintTime = taosGetTimestampMs();
int buff_len; int buff_len;
if (superTblInfo) buff_len = BUFFER_SIZE / 8;
buff_len = superTblInfo->maxSqlLen;
else
buff_len = BUFFER_SIZE;
char *buffer = calloc(buff_len, 1); char *buffer = calloc(buff_len, 1);
if (buffer == NULL) { if (buffer == NULL) {
@ -2624,7 +2657,7 @@ static void* createTable(void *sarg)
return NULL; return NULL;
} }
len += snprintf(buffer + len, len += snprintf(buffer + len,
superTblInfo->maxSqlLen - len, buff_len - len,
"if not exists %s.%s%d using %s.%s tags %s ", "if not exists %s.%s%d using %s.%s tags %s ",
winfo->db_name, superTblInfo->childTblPrefix, winfo->db_name, superTblInfo->childTblPrefix,
i, winfo->db_name, i, winfo->db_name,
@ -2632,7 +2665,7 @@ static void* createTable(void *sarg)
free(tagsValBuf); free(tagsValBuf);
batchNum++; batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum) if ((batchNum < superTblInfo->batchCreateTableNum)
&& ((superTblInfo->maxSqlLen - len) && ((buff_len - len)
>= (superTblInfo->lenOfTagOfOneRow + 256))) { >= (superTblInfo->lenOfTagOfOneRow + 256))) {
continue; continue;
} }
@ -2970,7 +3003,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (countObj && countObj->type == cJSON_Number) { if (countObj && countObj->type == cJSON_Number) {
count = countObj->valueint; count = countObj->valueint;
} else if (countObj && countObj->type != cJSON_Number) { } else if (countObj && countObj->type != cJSON_Number) {
errorPrint("%s() LN%d, failed to read json, column count not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, column count not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
count = 1; count = 1;
@ -2979,8 +3013,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// column info // column info
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(column, "type"); cJSON *dataType = cJSON_GetObjectItem(column, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String
errorPrint("%s() LN%d: failed to read json, column type not found\n", __func__, __LINE__); || dataType->valuestring == NULL) {
errorPrint("%s() LN%d: failed to read json, column type not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
//tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE); //tstrncpy(superTbls->columns[k].dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
@ -2990,7 +3026,8 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (dataLen && dataLen->type == cJSON_Number) { if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint; columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) { } else if (dataLen && dataLen->type != cJSON_Number) {
debugPrint("%s() LN%d: failed to read json, column len not found\n", __func__, __LINE__); debugPrint("%s() LN%d: failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
columnCase.dataLen = 8; columnCase.dataLen = 8;
@ -3010,13 +3047,15 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// tags // tags
cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags"); cJSON *tags = cJSON_GetObjectItem(stbInfo, "tags");
if (!tags || tags->type != cJSON_Array) { if (!tags || tags->type != cJSON_Array) {
debugPrint("%s() LN%d, failed to read json, tags not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, tags not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
int tagSize = cJSON_GetArraySize(tags); int tagSize = cJSON_GetArraySize(tags);
if (tagSize > MAX_TAG_COUNT) { if (tagSize > MAX_TAG_COUNT) {
debugPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n", __func__, __LINE__, MAX_TAG_COUNT); errorPrint("%s() LN%d, failed to read json, tags size overflow, max tag size is %d\n",
__func__, __LINE__, MAX_TAG_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3039,8 +3078,10 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
// column info // column info
memset(&columnCase, 0, sizeof(StrColumn)); memset(&columnCase, 0, sizeof(StrColumn));
cJSON *dataType = cJSON_GetObjectItem(tag, "type"); cJSON *dataType = cJSON_GetObjectItem(tag, "type");
if (!dataType || dataType->type != cJSON_String || dataType->valuestring == NULL) { if (!dataType || dataType->type != cJSON_String
printf("ERROR: failed to read json, tag type not found\n"); || dataType->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, tag type not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE); tstrncpy(columnCase.dataType, dataType->valuestring, MAX_TB_NAME_SIZE);
@ -3049,14 +3090,16 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
if (dataLen && dataLen->type == cJSON_Number) { if (dataLen && dataLen->type == cJSON_Number) {
columnCase.dataLen = dataLen->valueint; columnCase.dataLen = dataLen->valueint;
} else if (dataLen && dataLen->type != cJSON_Number) { } else if (dataLen && dataLen->type != cJSON_Number) {
printf("ERROR: failed to read json, column len not found\n"); errorPrint("%s() LN%d, failed to read json, column len not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
columnCase.dataLen = 0; columnCase.dataLen = 0;
} }
for (int n = 0; n < count; ++n) { for (int n = 0; n < count; ++n) {
tstrncpy(superTbls->tags[index].dataType, columnCase.dataType, MAX_TB_NAME_SIZE); tstrncpy(superTbls->tags[index].dataType, columnCase.dataType,
MAX_TB_NAME_SIZE);
superTbls->tags[index].dataLen = columnCase.dataLen; superTbls->tags[index].dataLen = columnCase.dataLen;
index++; index++;
} }
@ -3066,9 +3109,6 @@ static bool getColumnAndTagTypeFromInsertJsonFile(
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret; return ret;
} }
@ -3145,7 +3185,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!gInsertInterval) { } else if (!gInsertInterval) {
g_args.insert_interval = 0; g_args.insert_interval = 0;
} else { } else {
errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, insert_interval input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3166,7 +3207,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!interlaceRows) { } else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, interlace_rows input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3176,7 +3218,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!maxSqlLen) { } else if (!maxSqlLen) {
g_args.max_sql_len = TSDB_PAYLOAD_SIZE; g_args.max_sql_len = TSDB_PAYLOAD_SIZE;
} else { } else {
errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, max_sql_len input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3186,7 +3229,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!numRecPerReq) { } else if (!numRecPerReq) {
g_args.num_of_RPR = 0xffff; g_args.num_of_RPR = 0xffff;
} else { } else {
errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, num_of_records_per_req not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3479,9 +3523,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (childTblExists if (childTblExists
&& childTblExists->type == cJSON_String && childTblExists->type == cJSON_String
&& childTblExists->valuestring != NULL) { && childTblExists->valuestring != NULL) {
if (0 == strncasecmp(childTblExists->valuestring, "yes", 3)) { if ((0 == strncasecmp(childTblExists->valuestring, "yes", 3))
&& (g_Dbs.db[i].drop == false)) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS; g_Dbs.db[i].superTbls[j].childTblExists = TBL_ALREADY_EXISTS;
} else if (0 == strncasecmp(childTblExists->valuestring, "no", 2)) { } else if ((0 == strncasecmp(childTblExists->valuestring, "no", 2)
|| (g_Dbs.db[i].drop == true))) {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS; g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
} else { } else {
g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS; g_Dbs.db[i].superTbls[j].childTblExists = TBL_NO_EXISTS;
@ -3510,7 +3556,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} else if (!dataSource) { } else if (!dataSource) {
tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE); tstrncpy(g_Dbs.db[i].superTbls[j].dataSource, "rand", MAX_DB_NAME_SIZE);
} else { } else {
errorPrint("%s() LN%d, failed to read json, data_source not found\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, data_source not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3527,18 +3574,20 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} }
cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit"); cJSON* childTbl_limit = cJSON_GetObjectItem(stbInfo, "childtable_limit");
if (childTbl_limit) { if ((childTbl_limit) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if (childTbl_limit->type != cJSON_Number) { if (childTbl_limit->type != cJSON_Number) {
printf("ERROR: failed to read json, childtable_limit\n"); printf("ERROR: failed to read json, childtable_limit\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint; g_Dbs.db[i].superTbls[j].childTblLimit = childTbl_limit->valueint;
} else { } else {
g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result g_Dbs.db[i].superTbls[j].childTblLimit = -1; // select ... limit -1 means all query result, drop = yes mean all table need recreate, limit value is invalid.
} }
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset"); cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
if (childTbl_offset) { if ((childTbl_offset) && (g_Dbs.db[i].drop != true)
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) { if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) {
printf("ERROR: failed to read json, childtable_offset\n"); printf("ERROR: failed to read json, childtable_offset\n");
goto PARSE_OVER; goto PARSE_OVER;
@ -3583,7 +3632,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} }
cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file"); cJSON *sampleFile = cJSON_GetObjectItem(stbInfo, "sample_file");
if (sampleFile && sampleFile->type == cJSON_String && sampleFile->valuestring != NULL) { if (sampleFile && sampleFile->type == cJSON_String
&& sampleFile->valuestring != NULL) {
tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile, tstrncpy(g_Dbs.db[i].superTbls[j].sampleFile,
sampleFile->valuestring, MAX_FILE_NAME_LEN); sampleFile->valuestring, MAX_FILE_NAME_LEN);
} else if (!sampleFile) { } else if (!sampleFile) {
@ -3726,9 +3776,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret; return ret;
} }
@ -3794,7 +3841,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!gQueryTimes) { } else if (!gQueryTimes) {
g_args.query_times = 1; g_args.query_times = 1;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3832,35 +3880,45 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
g_queryInfo.specifiedQueryInfo.rate = 0; g_queryInfo.specifiedQueryInfo.rate = 0;
} }
cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery, "query_times"); cJSON* specifiedQueryTimes = cJSON_GetObjectItem(specifiedQuery,
"query_times");
if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint; g_queryInfo.specifiedQueryInfo.queryTimes = specifiedQueryTimes->valueint;
} else if (!specifiedQueryTimes) { } else if (!specifiedQueryTimes) {
g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times; g_queryInfo.specifiedQueryInfo.queryTimes = g_args.query_times;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent");
if (concurrent && concurrent->type == cJSON_Number) { if (concurrent && concurrent->type == cJSON_Number) {
g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint; g_queryInfo.specifiedQueryInfo.concurrent = concurrent->valueint;
if (g_queryInfo.specifiedQueryInfo.concurrent <= 0) {
errorPrint("%s() LN%d, query sqlCount %d or concurrent %d is not correct.\n",
__func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount,
g_queryInfo.specifiedQueryInfo.concurrent);
goto PARSE_OVER;
}
} else if (!concurrent) { } else if (!concurrent) {
g_queryInfo.specifiedQueryInfo.concurrent = 1; g_queryInfo.specifiedQueryInfo.concurrent = 1;
} }
cJSON* mode = cJSON_GetObjectItem(specifiedQuery, "mode"); cJSON* queryMode = cJSON_GetObjectItem(specifiedQuery, "mode");
if (mode && mode->type == cJSON_String && mode->valuestring != NULL) { if (queryMode && queryMode->type == cJSON_String
if (0 == strcmp("sync", mode->valuestring)) { && queryMode->valuestring != NULL) {
g_queryInfo.specifiedQueryInfo.subscribeMode = 0; if (0 == strcmp("sync", queryMode->valuestring)) {
} else if (0 == strcmp("async", mode->valuestring)) { g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
g_queryInfo.specifiedQueryInfo.subscribeMode = 1; } else if (0 == strcmp("async", queryMode->valuestring)) {
g_queryInfo.specifiedQueryInfo.mode = ASYNC_QUERY_MODE;
} else { } else {
printf("ERROR: failed to read json, subscribe mod error\n"); errorPrint("%s() LN%d, failed to read json, query mode input error\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.subscribeMode = 0; g_queryInfo.specifiedQueryInfo.mode = SYNC_QUERY_MODE;
} }
cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval"); cJSON* interval = cJSON_GetObjectItem(specifiedQuery, "interval");
@ -3907,12 +3965,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!superSqls) { if (!superSqls) {
g_queryInfo.specifiedQueryInfo.sqlCount = 0; g_queryInfo.specifiedQueryInfo.sqlCount = 0;
} else if (superSqls->type != cJSON_Array) { } else if (superSqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n"); errorPrint("%s() LN%d, failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
int superSqlSize = cJSON_GetArraySize(superSqls); int superSqlSize = cJSON_GetArraySize(superSqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) { if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3964,7 +4024,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} else if (!superQueryTimes) { } else if (!superQueryTimes) {
g_queryInfo.superQueryInfo.queryTimes = g_args.query_times; g_queryInfo.superQueryInfo.queryTimes = g_args.query_times;
} else { } else {
errorPrint("%s() LN%d, failed to read json, query_times input mistake\n", __func__, __LINE__); errorPrint("%s() LN%d, failed to read json, query_times input mistake\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -3983,25 +4044,30 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
//} //}
cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname"); cJSON* stblname = cJSON_GetObjectItem(superQuery, "stblname");
if (stblname && stblname->type == cJSON_String && stblname->valuestring != NULL) { if (stblname && stblname->type == cJSON_String
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring, MAX_TB_NAME_SIZE); && stblname->valuestring != NULL) {
tstrncpy(g_queryInfo.superQueryInfo.sTblName, stblname->valuestring,
MAX_TB_NAME_SIZE);
} else { } else {
printf("ERROR: failed to read json, super table name not found\n"); errorPrint("%s() LN%d, failed to read json, super table name input error\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* submode = cJSON_GetObjectItem(superQuery, "mode"); cJSON* submode = cJSON_GetObjectItem(superQuery, "mode");
if (submode && submode->type == cJSON_String && submode->valuestring != NULL) { if (submode && submode->type == cJSON_String
&& submode->valuestring != NULL) {
if (0 == strcmp("sync", submode->valuestring)) { if (0 == strcmp("sync", submode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 0; g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
} else if (0 == strcmp("async", submode->valuestring)) { } else if (0 == strcmp("async", submode->valuestring)) {
g_queryInfo.superQueryInfo.subscribeMode = 1; g_queryInfo.superQueryInfo.mode = ASYNC_QUERY_MODE;
} else { } else {
printf("ERROR: failed to read json, subscribe mod error\n"); errorPrint("%s() LN%d, failed to read json, query mode input error\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} else { } else {
g_queryInfo.superQueryInfo.subscribeMode = 0; g_queryInfo.superQueryInfo.mode = SYNC_QUERY_MODE;
} }
cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval"); cJSON* subinterval = cJSON_GetObjectItem(superQuery, "interval");
@ -4014,7 +4080,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
} }
cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart"); cJSON* subrestart = cJSON_GetObjectItem(superQuery, "restart");
if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) { if (subrestart && subrestart->type == cJSON_String
&& subrestart->valuestring != NULL) {
if (0 == strcmp("yes", subrestart->valuestring)) { if (0 == strcmp("yes", subrestart->valuestring)) {
g_queryInfo.superQueryInfo.subscribeRestart = 1; g_queryInfo.superQueryInfo.subscribeRestart = 1;
} else if (0 == strcmp("no", subrestart->valuestring)) { } else if (0 == strcmp("no", subrestart->valuestring)) {
@ -4048,12 +4115,14 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (!subsqls) { if (!subsqls) {
g_queryInfo.superQueryInfo.sqlCount = 0; g_queryInfo.superQueryInfo.sqlCount = 0;
} else if (subsqls->type != cJSON_Array) { } else if (subsqls->type != cJSON_Array) {
printf("ERROR: failed to read json, super sqls not found\n"); errorPrint("%s() LN%d: failed to read json, super sqls not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} else { } else {
int superSqlSize = cJSON_GetArraySize(subsqls); int superSqlSize = cJSON_GetArraySize(subsqls);
if (superSqlSize > MAX_QUERY_SQL_COUNT) { if (superSqlSize > MAX_QUERY_SQL_COUNT) {
printf("ERROR: failed to read json, query sql size overflow, max is %d\n", MAX_QUERY_SQL_COUNT); errorPrint("%s() LN%d, failed to read json, query sql size overflow, max is %d\n",
__func__, __LINE__, MAX_QUERY_SQL_COUNT);
goto PARSE_OVER; goto PARSE_OVER;
} }
@ -4063,19 +4132,25 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
if (sql == NULL) continue; if (sql == NULL) continue;
cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql"); cJSON *sqlStr = cJSON_GetObjectItem(sql, "sql");
if (!sqlStr || sqlStr->type != cJSON_String || sqlStr->valuestring == NULL) { if (!sqlStr || sqlStr->type != cJSON_String
printf("ERROR: failed to read json, sql not found\n"); || sqlStr->valuestring == NULL) {
errorPrint("%s() LN%d, failed to read json, sql not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring,
MAX_QUERY_SQL_LENGTH);
cJSON *result = cJSON_GetObjectItem(sql, "result"); cJSON *result = cJSON_GetObjectItem(sql, "result");
if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){ if (result != NULL && result->type == cJSON_String
tstrncpy(g_queryInfo.superQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); && result->valuestring != NULL){
tstrncpy(g_queryInfo.superQueryInfo.result[j],
result->valuestring, MAX_FILE_NAME_LEN);
} else if (NULL == result) { } else if (NULL == result) {
memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); memset(g_queryInfo.superQueryInfo.result[j], 0, MAX_FILE_NAME_LEN);
} else { } else {
printf("ERROR: failed to read json, sub query result file not found\n"); errorPrint("%s() LN%d, failed to read json, sub query result file not found\n",
__func__, __LINE__);
goto PARSE_OVER; goto PARSE_OVER;
} }
} }
@ -4085,9 +4160,6 @@ static bool getMetaFromQueryJsonFile(cJSON* root) {
ret = true; ret = true;
PARSE_OVER: PARSE_OVER:
//free(content);
//cJSON_Delete(root);
//fclose(fp);
return ret; return ret;
} }
@ -4449,21 +4521,22 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
} else if (0 == strncasecmp(superTblInfo->dataSource, } else if (0 == strncasecmp(superTblInfo->dataSource,
"rand", strlen("rand"))) { "rand", strlen("rand"))) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
int randTail;
if (0 != superTblInfo->disorderRatio if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) { && rand_num < superTblInfo->disorderRatio) {
int64_t d = startTime randTail = (superTblInfo->timeStampStep * k
+ superTblInfo->timeStampStep * k + (taosRandom() % superTblInfo->disorderRange + 1)) * (-1);
- taosRandom() % superTblInfo->disorderRange; debugPrint("rand data generated, back %d\n", randTail);
} else {
randTail = superTblInfo->timeStampStep * k;
}
uint64_t d = startTime
+ randTail;
retLen = generateRowData( retLen = generateRowData(
data, data,
d, d,
superTblInfo); superTblInfo);
} else {
retLen = generateRowData(
data,
startTime + superTblInfo->timeStampStep * k,
superTblInfo);
}
} }
if (retLen > remainderBufLen) { if (retLen > remainderBufLen) {
@ -4479,20 +4552,21 @@ static int generateDataTail(char *tableName, int32_t tableSeq,
int lenOfBinary = g_args.len_of_binary; int lenOfBinary = g_args.len_of_binary;
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
int randTail;
if ((g_args.disorderRatio != 0) if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRatio)) { && (rand_num < g_args.disorderRatio)) {
randTail = (DEFAULT_TIMESTAMP_STEP * k
int64_t d = startTime + DEFAULT_TIMESTAMP_STEP * k + (taosRandom() % g_args.disorderRange + 1)) * (-1);
- taosRandom() % g_args.disorderRange; debugPrint("rand data generated, back %d\n", randTail);
retLen = generateData(data, data_type,
ncols_per_record, d, lenOfBinary);
} else { } else {
randTail = DEFAULT_TIMESTAMP_STEP * k;
}
retLen = generateData(data, data_type, retLen = generateData(data, data_type,
ncols_per_record, ncols_per_record,
startTime + DEFAULT_TIMESTAMP_STEP * k, startTime + randTail,
lenOfBinary); lenOfBinary);
}
if (len > remainderBufLen) if (len > remainderBufLen)
break; break;
@ -5034,7 +5108,7 @@ static void callBack(void *param, TAOS_RES *res, int code) {
int rand_num = taosRandom() % 100; int rand_num = taosRandom() % 100;
if (0 != winfo->superTblInfo->disorderRatio if (0 != winfo->superTblInfo->disorderRatio
&& rand_num < winfo->superTblInfo->disorderRatio) { && rand_num < winfo->superTblInfo->disorderRatio) {
int64_t d = winfo->lastTs - taosRandom() % winfo->superTblInfo->disorderRange; int64_t d = winfo->lastTs - (taosRandom() % winfo->superTblInfo->disorderRange + 1);
generateRowData(data, d, winfo->superTblInfo); generateRowData(data, d, winfo->superTblInfo);
} else { } else {
generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo); generateRowData(data, winfo->lastTs += 1000, winfo->superTblInfo);
@ -5164,13 +5238,15 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int limit, offset; int limit, offset;
if ((superTblInfo->childTblExists == TBL_NO_EXISTS) && if ((superTblInfo->childTblExists == TBL_NO_EXISTS) &&
((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit != 0))) { ((superTblInfo->childTblOffset != 0) || (superTblInfo->childTblLimit >= 0))) {
printf("WARNING: offset and limit will not be used since the child tables are not exists!\n"); printf("WARNING: offset and limit will not be used since the child tables are not exists!\n");
} }
if ((superTblInfo->childTblExists == TBL_ALREADY_EXISTS) if ((superTblInfo->childTblExists == TBL_ALREADY_EXISTS)
&& (superTblInfo->childTblOffset >= 0)) { && (superTblInfo->childTblOffset >= 0)) {
if (superTblInfo->childTblLimit < 0) { if ((superTblInfo->childTblLimit < 0)
|| ((superTblInfo->childTblOffset + superTblInfo->childTblLimit)
> (superTblInfo->childTblCount))) {
superTblInfo->childTblLimit = superTblInfo->childTblLimit =
superTblInfo->childTblCount - superTblInfo->childTblOffset; superTblInfo->childTblCount - superTblInfo->childTblOffset;
} }
@ -5599,7 +5675,7 @@ static int insertTestProcess() {
return 0; return 0;
} }
static void *superQueryProcess(void *sarg) { static void *specifiedQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
if (winfo->taos == NULL) { if (winfo->taos == NULL) {
@ -5640,22 +5716,25 @@ static void *superQueryProcess(void *sarg) {
} }
st = taosGetTimestampUs(); st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[winfo->querySeq][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[winfo->querySeq],
winfo->threadID);
} }
selectAndGetResult(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], tmpFile); selectAndGetResult(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq], tmpFile);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else { } else {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host, int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port, g_queryInfo.specifiedQueryInfo.sql[i]); g_queryInfo.port,
g_queryInfo.specifiedQueryInfo.sql[winfo->querySeq]);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
@ -5665,7 +5744,7 @@ static void *superQueryProcess(void *sarg) {
return NULL; return NULL;
} }
} }
}
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0); taosGetSelfPthreadId(), (double)(et - st)/1000.0);
@ -5695,7 +5774,7 @@ static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
//printf("3: %s\n", outSql); //printf("3: %s\n", outSql);
} }
static void *subQueryProcess(void *sarg) { static void *superQueryProcess(void *sarg) {
char sqlstr[1024]; char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
@ -5788,24 +5867,24 @@ static int queryTestProcess() {
pthread_t *pids = NULL; pthread_t *pids = NULL;
threadInfo *infos = NULL; threadInfo *infos = NULL;
//==== create sub threads for query from specify table //==== create sub threads for query from specify table
if (g_queryInfo.specifiedQueryInfo.sqlCount > 0 int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
&& g_queryInfo.specifiedQueryInfo.concurrent > 0) { int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); if ((nSqlCount > 0) && (nConcurrent > 0)) {
if (NULL == pids) {
pids = malloc(nConcurrent * nSqlCount * sizeof(pthread_t));
infos = malloc(nConcurrent * nSqlCount * sizeof(threadInfo));
if ((NULL == pids) || (NULL == infos)) {
taos_close(taos); taos_close(taos);
ERROR_EXIT("memory allocation failed\n");
}
infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo));
if (NULL == infos) {
taos_close(taos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { for (int i = 0; i < nConcurrent; i++) {
threadInfo *t_info = infos + i; for (int j = 0; j < nSqlCount; j++) {
t_info->threadID = i; threadInfo *t_info = infos + i * nSqlCount + j;
t_info->threadID = i * nSqlCount + j;
t_info->querySeq = j;
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
@ -5824,7 +5903,9 @@ static int queryTestProcess() {
t_info->taos = NULL;// TODO: workaround to use separate taos connection; t_info->taos = NULL;// TODO: workaround to use separate taos connection;
pthread_create(pids + i, NULL, superQueryProcess, t_info); pthread_create(pids + i * nSqlCount + j, NULL, specifiedQueryProcess,
t_info);
}
} }
} else { } else {
g_queryInfo.specifiedQueryInfo.concurrent = 0; g_queryInfo.specifiedQueryInfo.concurrent = 0;
@ -5838,18 +5919,12 @@ static int queryTestProcess() {
if ((g_queryInfo.superQueryInfo.sqlCount > 0) if ((g_queryInfo.superQueryInfo.sqlCount > 0)
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) { && (g_queryInfo.superQueryInfo.threadCnt > 0)) {
pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t));
if (NULL == pidsOfSub) {
free(infos);
free(pids);
ERROR_EXIT("memory allocation failed for create threads\n");
}
infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo));
if (NULL == infosOfSub) {
free(pidsOfSub); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) {
free(infos); free(infos);
free(pids); free(pids);
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
@ -5877,7 +5952,7 @@ static int queryTestProcess() {
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
startFrom = t_info->end_table_to + 1; startFrom = t_info->end_table_to + 1;
t_info->taos = NULL; // TODO: workaround to use separate taos connection; t_info->taos = NULL; // TODO: workaround to use separate taos connection;
pthread_create(pidsOfSub + i, NULL, subQueryProcess, t_info); pthread_create(pidsOfSub + i, NULL, superQueryProcess, t_info);
} }
g_queryInfo.superQueryInfo.threadCnt = threads; g_queryInfo.superQueryInfo.threadCnt = threads;
@ -5885,8 +5960,12 @@ static int queryTestProcess() {
g_queryInfo.superQueryInfo.threadCnt = 0; g_queryInfo.superQueryInfo.threadCnt = 0;
} }
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { if ((nSqlCount > 0) && (nConcurrent > 0)) {
pthread_join(pids[i], NULL); for (int i = 0; i < nConcurrent; i++) {
for (int j = 0; j < nSqlCount; j++) {
pthread_join(pids[i * nSqlCount + j], NULL);
}
}
} }
tmfree((char*)pids); tmfree((char*)pids);
@ -5917,7 +5996,7 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c
static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) { static TAOS_SUB* subscribeImpl(TAOS *taos, char *sql, char* topic, char* resultFileName) {
TAOS_SUB* tsub = NULL; TAOS_SUB* tsub = NULL;
if (g_queryInfo.specifiedQueryInfo.subscribeMode) { if (g_queryInfo.specifiedQueryInfo.mode) {
tsub = taos_subscribe(taos, tsub = taos_subscribe(taos,
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.subscribeRestart,
topic, sql, subscribe_callback, (void*)resultFileName, topic, sql, subscribe_callback, (void*)resultFileName,
@ -5999,7 +6078,7 @@ static void *subSubscribeProcess(void *sarg) {
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.superQueryInfo.subscribeMode) { if (1 == g_queryInfo.superQueryInfo.mode) {
continue; continue;
} }
@ -6070,7 +6149,8 @@ static void *superSubscribeProcess(void *sarg) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID); g_queryInfo.specifiedQueryInfo.result[i], winfo->threadID);
} }
tsub[i] = subscribeImpl(winfo->taos, g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); tsub[i] = subscribeImpl(winfo->taos,
g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) { if (NULL == g_queryInfo.specifiedQueryInfo.tsub[i]) {
taos_close(winfo->taos); taos_close(winfo->taos);
return NULL; return NULL;
@ -6084,7 +6164,7 @@ static void *superSubscribeProcess(void *sarg) {
TAOS_RES* res = NULL; TAOS_RES* res = NULL;
while(1) { while(1) {
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
if (1 == g_queryInfo.specifiedQueryInfo.subscribeMode) { if (SYNC_QUERY_MODE == g_queryInfo.specifiedQueryInfo.mode) {
continue; continue;
} }
@ -6102,7 +6182,8 @@ static void *superSubscribeProcess(void *sarg) {
taos_free_result(res); taos_free_result(res);
for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) {
taos_unsubscribe(tsub[i], g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); taos_unsubscribe(tsub[i],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
} }
taos_close(winfo->taos); taos_close(winfo->taos);
@ -6305,7 +6386,7 @@ static void setParaFromArg(){
g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables; g_Dbs.db[0].superTbls[0].childTblCount = g_args.num_of_tables;
g_Dbs.threadCount = g_args.num_of_threads; g_Dbs.threadCount = g_args.num_of_threads;
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.queryMode = g_args.mode; g_Dbs.queryMode = g_args.query_mode;
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
@ -6470,12 +6551,11 @@ static void testMetaFile() {
} }
static void queryResult() { static void queryResult() {
// select
if (false == g_Dbs.insert_only) {
// query data // query data
pthread_t read_id; pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo)); threadInfo *rInfo = malloc(sizeof(threadInfo));
assert(rInfo);
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000 rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_from = 0; rInfo->start_table_from = 0;
@ -6516,7 +6596,6 @@ static void queryResult() {
taos_close(rInfo->taos); taos_close(rInfo->taos);
free(rInfo); free(rInfo);
} }
}
static void testCmdLine() { static void testCmdLine() {
@ -6533,9 +6612,7 @@ static void testCmdLine() {
g_args.test_mode = INSERT_TEST; g_args.test_mode = INSERT_TEST;
insertTestProcess(); insertTestProcess();
if (g_Dbs.insert_only) if (false == g_Dbs.insert_only)
return;
else
queryResult(); queryResult();
} }

View File

@ -315,6 +315,10 @@ void sdbUpdateAsync() {
taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsSdbTmr); taosTmrReset(sdbUpdateSyncTmrFp, 200, NULL, tsMnodeTmr, &tsSdbTmr);
} }
static int node_cmp(const void *l, const void *r) {
return ((SNodeInfo *)l)->nodeId - ((SNodeInfo *)r)->nodeId;
}
int32_t sdbUpdateSync(void *pMnodes) { int32_t sdbUpdateSync(void *pMnodes) {
SMInfos *pMinfos = pMnodes; SMInfos *pMinfos = pMnodes;
if (!mnodeIsRunning()) { if (!mnodeIsRunning()) {
@ -382,6 +386,8 @@ int32_t sdbUpdateSync(void *pMnodes) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qsort(syncCfg.nodeInfo, syncCfg.replica, sizeof(syncCfg.nodeInfo[0]), node_cmp);
sdbInfo("vgId:1, work as mnode, replica:%d", syncCfg.replica); sdbInfo("vgId:1, work as mnode, replica:%d", syncCfg.replica);
for (int32_t i = 0; i < syncCfg.replica; ++i) { for (int32_t i = 0; i < syncCfg.replica; ++i) {
sdbInfo("vgId:1, mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn, sdbInfo("vgId:1, mnode:%d, %s:%d", syncCfg.nodeInfo[i].nodeId, syncCfg.nodeInfo[i].nodeFqdn,

View File

@ -190,6 +190,8 @@ typedef struct SQuery {
bool stabledev; // super table stddev query bool stabledev; // super table stddev query
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
SOrderVal order; SOrderVal order;
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
@ -285,6 +287,7 @@ enum OPERATOR_TYPE_E {
OP_Fill = 13, OP_Fill = 13,
OP_MultiTableAggregate = 14, OP_MultiTableAggregate = 14,
OP_MultiTableTimeInterval = 15, OP_MultiTableTimeInterval = 15,
OP_Having = 16,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
@ -402,6 +405,11 @@ typedef struct SOffsetOperatorInfo {
int64_t offset; int64_t offset;
} SOffsetOperatorInfo; } SOffsetOperatorInfo;
typedef struct SHavingOperatorInfo {
SArray* fp;
} SHavingOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo; SFillInfo *pFillInfo;
SSDataBlock *pRes; SSDataBlock *pRes;

View File

@ -98,6 +98,7 @@ typedef struct SQuerySqlNode {
SLimitVal limit; // limit offset [optional] SLimitVal limit; // limit offset [optional]
SLimitVal slimit; // group limit offset [optional] SLimitVal slimit; // group limit offset [optional]
SStrToken sqlstr; // sql string in select clause SStrToken sqlstr; // sql string in select clause
struct tSqlExpr *pHaving; // having clause [optional]
} SQuerySqlNode; } SQuerySqlNode;
typedef struct STableNamePair { typedef struct STableNamePair {
@ -253,6 +254,11 @@ SArray *tVariantListAppend(SArray *pList, tVariant *pVar, uint8_t sortOrder);
SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index); SArray *tVariantListInsert(SArray *pList, tVariant *pVar, uint8_t sortOrder, int32_t index);
SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder); SArray *tVariantListAppendToken(SArray *pList, SStrToken *pAliasToken, uint8_t sortOrder);
tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType);
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
tSqlExpr *tSqlExprClone(tSqlExpr *pSrc);
SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias); SFromInfo *setTableNameList(SFromInfo* pFromInfo, SStrToken *pName, SStrToken* pAlias);
SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode *pSqlNode); SFromInfo *setSubquery(SFromInfo* pFromInfo, SQuerySqlNode *pSqlNode);
void *destroyFromInfo(SFromInfo* pFromInfo); void *destroyFromInfo(SFromInfo* pFromInfo);
@ -272,7 +278,7 @@ void tSqlExprListDestroy(SArray *pList);
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere, SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit); SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type); SCreateTableSql *tSetCreateTableInfo(SArray *pCols, SArray *pTags, SQuerySqlNode *pSelect, int32_t type);

View File

@ -52,10 +52,19 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int
return pResultRowInfo->pResult[slot]; return pResultRowInfo->pResult[slot];
} }
static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) { static FORCE_INLINE char* getPosInResultPage(SQueryRuntimeEnv* pRuntimeEnv, tFilePage* page, int32_t rowOffset,
assert(rowOffset >= 0 && pQuery != NULL); int16_t offset, int32_t size) {
assert(rowOffset >= 0 && pRuntimeEnv != NULL);
SQuery* pQuery = pRuntimeEnv->pQuery;
int64_t pageSize = pRuntimeEnv->pResultBuf->pageSize;
int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery); int32_t numOfRows = (int32_t)GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
// buffer overflow check
int64_t bufEnd = (rowOffset + offset * numOfRows + size);
assert(page->num <= pageSize && bufEnd <= page->num);
return ((char*)page->data) + rowOffset + offset * numOfRows; return ((char*)page->data) + rowOffset + offset * numOfRows;
} }

View File

@ -453,7 +453,7 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
%type select {SQuerySqlNode*} %type select {SQuerySqlNode*}
%destructor select {destroyQuerySqlNode($$);} %destructor select {destroyQuerySqlNode($$);}
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). { select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G); A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N);
} }
select(A) ::= LP select(B) RP. {A = B;} select(A) ::= LP select(B) RP. {A = B;}
@ -471,7 +471,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
// select client_version() // select client_version()
// select server_state() // select server_state()
select(A) ::= SELECT(T) selcollist(W). { select(A) ::= SELECT(T) selcollist(W). {
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL); A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
} }
// selcollist is a list of expressions that are to become the return // selcollist is a list of expressions that are to become the return

View File

@ -2771,14 +2771,16 @@ static void percentile_function(SQLFunctionCtx *pCtx) {
SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); SPercentileInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed // all data are null, set it completed
if (pInfo->numOfElems == 0) { if (pInfo->numOfElems == 0) {
pResInfo->complete = true; pResInfo->complete = true;
return;
} else { } else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
} }
pInfo->stage += 1;
} }
// the first stage, only acquire the min/max value // the first stage, only acquire the min/max value
@ -2857,14 +2859,16 @@ static void percentile_function_f(SQLFunctionCtx *pCtx, int32_t index) {
SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo); SPercentileInfo *pInfo = (SPercentileInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) { if (pCtx->currentStage == REPEAT_SCAN && pInfo->stage == 0) {
pInfo->stage += 1;
// all data are null, set it completed // all data are null, set it completed
if (pInfo->numOfElems == 0) { if (pInfo->numOfElems == 0) {
pResInfo->complete = true; pResInfo->complete = true;
return;
} else { } else {
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval); pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
} }
pInfo->stage += 1;
} }
if (pInfo->stage == 0) { if (pInfo->stage == 0) {

View File

@ -181,6 +181,7 @@ static SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntime
static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); static SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput);
static SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); static SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
static SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
@ -1819,6 +1820,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
} }
} }
if (pQuery->havingNum > 0) {
pRuntimeEnv->proot = createHavingOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQuery->pExpr1, pQuery->numOfOutput);
}
if (pQuery->limit.offset > 0) { if (pQuery->limit.offset > 0) {
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
} }
@ -3243,7 +3248,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
continue; continue;
} }
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, bufPage, pResult->offset, offset); pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, bufPage, pResult->offset, offset, pCtx[i].outputBytes);
offset += pCtx[i].outputBytes; offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId; int32_t functionId = pCtx[i].functionId;
@ -3301,7 +3306,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
int16_t offset = 0; int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, page, pResult->offset, offset); pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv, page, pResult->offset, offset, pCtx[i].outputBytes);
offset += pCtx[i].outputBytes; offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId; int32_t functionId = pCtx[i].functionId;
@ -3509,8 +3514,6 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
*/ */
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) { static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
@ -3545,7 +3548,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
int32_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
char *out = pColInfoData->pData + numOfResult * bytes; char *out = pColInfoData->pData + numOfResult * bytes;
char *in = getPosInResultPage(pQuery, page, pRow->offset, offset); char *in = getPosInResultPage(pRuntimeEnv, page, pRow->offset, offset, bytes);
memcpy(out, in, bytes * numOfRowsToCopy); memcpy(out, in, bytes * numOfRowsToCopy);
offset += bytes; offset += bytes;
@ -4015,7 +4018,7 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol; return pFillCol;
} }
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *tsdb, int32_t vgId, bool isSTableQuery) { int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
@ -4026,8 +4029,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery);
pQuery->stabledev = isStabledev(pQuery); pQuery->stabledev = isStabledev(pQuery);
pRuntimeEnv->prevResult = prevResult;
setScanLimitationByResultBuffer(pQuery); setScanLimitationByResultBuffer(pQuery);
int32_t code = setupQueryHandle(tsdb, pQInfo, isSTableQuery); int32_t code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
@ -4669,6 +4670,111 @@ static SSDataBlock* doOffset(void* param) {
} }
} }
bool doFilterData(SColumnInfoData* p, int32_t rid, SColumnFilterElem *filterElem, __filter_func_t fp) {
char* input = p->pData + p->info.bytes * rid;
bool isnull = isNull(input, p->info.type);
if (isnull) {
return (fp == isNullOperator) ? true : false;
} else {
if (fp == notNullOperator) {
return true;
} else if (fp == isNullOperator) {
return false;
}
}
if (fp(filterElem, input, input, p->info.type)) {
return true;
}
return false;
}
void doHavingImpl(SOperatorInfo *pOperator, SSDataBlock *pBlock) {
SHavingOperatorInfo* pInfo = pOperator->info;
int32_t f = 0;
int32_t allQualified = 1;
int32_t exprQualified = 0;
for (int32_t r = 0; r < pBlock->info.rows; ++r) {
allQualified = 1;
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
SExprInfo* pExprInfo = &(pOperator->pExpr[i]);
if (pExprInfo->pFilter == NULL) {
continue;
}
SArray* es = taosArrayGetP(pInfo->fp, i);
assert(es);
size_t fpNum = taosArrayGetSize(es);
exprQualified = 0;
for (int32_t m = 0; m < fpNum; ++m) {
__filter_func_t fp = taosArrayGetP(es, m);
assert(fp);
//SColIndex* colIdx = &pExprInfo->base.colInfo;
SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, i);
SColumnFilterElem filterElem = {.filterInfo = pExprInfo->pFilter[m]};
if (doFilterData(p, r, &filterElem, fp)) {
exprQualified = 1;
break;
}
}
if (exprQualified == 0) {
allQualified = 0;
break;
}
}
if (allQualified == 0) {
continue;
}
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes;
memmove(pColInfoData->pData + f * bytes, pColInfoData->pData + bytes * r, bytes);
}
++f;
}
pBlock->info.rows = f;
}
static SSDataBlock* doHaving(void* param) {
SOperatorInfo *pOperator = (SOperatorInfo *)param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream);
if (pBlock == NULL) {
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
doHavingImpl(pOperator, pBlock);
return pBlock;
}
}
static SSDataBlock* doIntervalAgg(void* param) { static SSDataBlock* doIntervalAgg(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
@ -5019,6 +5125,13 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
} }
static void destroyHavingOperatorInfo(void* param, int32_t numOfOutput) {
SHavingOperatorInfo* pInfo = (SHavingOperatorInfo*) param;
if (pInfo->fp) {
taosArrayDestroy(pInfo->fp);
}
}
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
@ -5075,6 +5188,83 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
return pOperator; return pOperator;
} }
int32_t initFilterFp(SExprInfo* pExpr, int32_t numOfOutput, SArray** fps) {
__filter_func_t fp = NULL;
*fps = taosArrayInit(numOfOutput, sizeof(SArray*));
if (*fps == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &(pExpr[i]);
SColIndex* colIdx = &pExprInfo->base.colInfo;
if (pExprInfo->pFilter == NULL || !TSDB_COL_IS_NORMAL_COL(colIdx->flag)) {
taosArrayPush(*fps, &fp);
continue;
}
int32_t filterNum = pExprInfo->base.filterNum;
SColumnFilterInfo *filterInfo = pExprInfo->pFilter;
SArray* es = taosArrayInit(filterNum, sizeof(__filter_func_t));
for (int32_t j = 0; j < filterNum; ++j) {
int32_t lower = filterInfo->lowerRelOptr;
int32_t upper = filterInfo->upperRelOptr;
if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) {
qError("invalid rel optr");
taosArrayDestroy(es);
return TSDB_CODE_QRY_APP_ERROR;
}
__filter_func_t ffp = getFilterOperator(lower, upper);
if (ffp == NULL) {
qError("invalid filter info");
taosArrayDestroy(es);
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayPush(es, &ffp);
filterInfo += 1;
}
taosArrayPush(*fps, &es);
}
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createHavingOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SHavingOperatorInfo* pInfo = calloc(1, sizeof(SHavingOperatorInfo));
initFilterFp(pExpr, numOfOutput, &pInfo->fp);
assert(pInfo->fp);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "HavingOperator";
pOperator->operatorType = OP_Having;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr;
pOperator->upstream = upstream;
pOperator->exec = doHaving;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->cleanup = destroyHavingOperatorInfo;
return pOperator;
}
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) {
SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo));
pInfo->limit = pRuntimeEnv->pQuery->limit.limit; pInfo->limit = pRuntimeEnv->pQuery->limit.limit;
@ -5646,9 +5836,35 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
pExprMsg->functionId = htons(pExprMsg->functionId); pExprMsg->functionId = htons(pExprMsg->functionId);
pExprMsg->numOfParams = htons(pExprMsg->numOfParams); pExprMsg->numOfParams = htons(pExprMsg->numOfParams);
pExprMsg->resColId = htons(pExprMsg->resColId); pExprMsg->resColId = htons(pExprMsg->resColId);
pExprMsg->filterNum = htonl(pExprMsg->filterNum);
pMsg += sizeof(SSqlFuncMsg); pMsg += sizeof(SSqlFuncMsg);
SColumnFilterInfo* pExprFilterInfo = pExprMsg->filterInfo;
pMsg += sizeof(SColumnFilterInfo) * pExprMsg->filterNum;
for (int32_t f = 0; f < pExprMsg->filterNum; ++f) {
SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)pExprFilterInfo;
pFilterMsg->filterstr = htons(pFilterMsg->filterstr);
if (pFilterMsg->filterstr) {
pFilterMsg->len = htobe64(pFilterMsg->len);
pFilterMsg->pz = (int64_t)pMsg;
pMsg += (pFilterMsg->len + 1);
} else {
pFilterMsg->lowerBndi = htobe64(pFilterMsg->lowerBndi);
pFilterMsg->upperBndi = htobe64(pFilterMsg->upperBndi);
}
pFilterMsg->lowerRelOptr = htons(pFilterMsg->lowerRelOptr);
pFilterMsg->upperRelOptr = htons(pFilterMsg->upperRelOptr);
pExprFilterInfo++;
}
for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) { for (int32_t j = 0; j < pExprMsg->numOfParams; ++j) {
pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType); pExprMsg->arg[j].argType = htons(pExprMsg->arg[j].argType);
pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes); pExprMsg->arg[j].argBytes = htons(pExprMsg->arg[j].argBytes);
@ -5833,6 +6049,42 @@ _cleanup:
return code; return code;
} }
int32_t cloneExprFilterInfo(SColumnFilterInfo **dst, SColumnFilterInfo* src, int32_t filterNum) {
if (filterNum <= 0) {
return TSDB_CODE_SUCCESS;
}
*dst = calloc(filterNum, sizeof(*src));
if (*dst == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(*dst, src, sizeof(*src) * filterNum);
for (int32_t i = 0; i < filterNum; i++) {
if ((*dst)[i].filterstr && dst[i]->len > 0) {
void *pz = calloc(1, (size_t)(*dst)[i].len + 1);
if (pz == NULL) {
if (i == 0) {
free(*dst);
} else {
freeColumnFilterInfo(*dst, i);
}
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(pz, (void *)src->pz, (size_t)src->len + 1);
(*dst)[i].pz = (int64_t)pz;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) { static int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, SQueryTableMsg *pQueryMsg) {
qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg); qDebug("qmsg:%p create arithmetic expr from binary", pQueryMsg);
@ -5946,6 +6198,13 @@ int32_t createQueryFuncExprFromMsg(SQueryTableMsg* pQueryMsg, int32_t numOfOutpu
type = s->type; type = s->type;
bytes = s->bytes; bytes = s->bytes;
} }
if (pExprs[i].base.filterNum > 0) {
int32_t ret = cloneExprFilterInfo(&pExprs[i].pFilter, pExprMsg[i]->filterInfo, pExprMsg[i]->filterNum);
if (ret) {
return ret;
}
}
} }
int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64; int32_t param = (int32_t)pExprs[i].base.arg[0].argValue.i64;
@ -6235,6 +6494,10 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) { if (TSDB_COL_IS_TAG(pExprs[col].base.colInfo.flag)) {
pQuery->tagLen += pExprs[col].bytes; pQuery->tagLen += pExprs[col].bytes;
} }
if (pExprs[col].pFilter) {
++pQuery->havingNum;
}
} }
doUpdateExprColumnIndex(pQuery); doUpdateExprColumnIndex(pQuery);
@ -6338,6 +6601,10 @@ _cleanup_qinfo:
tExprTreeDestroy(pExprInfo->pExpr, NULL); tExprTreeDestroy(pExprInfo->pExpr, NULL);
pExprInfo->pExpr = NULL; pExprInfo->pExpr = NULL;
} }
if (pExprInfo->pFilter) {
freeColumnFilterInfo(pExprInfo->pFilter, pExprInfo->base.filterNum);
}
} }
tfree(pExprs); tfree(pExprs);
@ -6383,6 +6650,8 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
SArray* prevResult = NULL; SArray* prevResult = NULL;
if (pQueryMsg->prevResultLen > 0) { if (pQueryMsg->prevResultLen > 0) {
prevResult = interResFromBinary(param->prevResult, pQueryMsg->prevResultLen); prevResult = interResFromBinary(param->prevResult, pQueryMsg->prevResultLen);
pRuntimeEnv->prevResult = prevResult;
} }
pQuery->precision = tsdbGetCfg(tsdb)->precision; pQuery->precision = tsdbGetCfg(tsdb)->precision;
@ -6404,7 +6673,7 @@ int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *p
} }
// filter the qualified // filter the qualified
if ((code = doInitQInfo(pQInfo, pTsBuf, prevResult, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) { if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -6422,7 +6691,7 @@ void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
} }
for (int32_t i = 0; i < numOfFilters; i++) { for (int32_t i = 0; i < numOfFilters; i++) {
if (pFilter[i].filterstr) { if (pFilter[i].filterstr && pFilter[i].pz) {
free((void*)(pFilter[i].pz)); free((void*)(pFilter[i].pz));
} }
} }
@ -6464,6 +6733,10 @@ static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) {
if (pExprInfo[i].pExpr != NULL) { if (pExprInfo[i].pExpr != NULL) {
tExprTreeDestroy(pExprInfo[i].pExpr, NULL); tExprTreeDestroy(pExprInfo[i].pExpr, NULL);
} }
if (pExprInfo[i].pFilter) {
freeColumnFilterInfo(pExprInfo[i].pFilter, pExprInfo[i].base.filterNum);
}
} }
tfree(pExprInfo); tfree(pExprInfo);

View File

@ -310,6 +310,77 @@ tSqlExpr *tSqlExprCreate(tSqlExpr *pLeft, tSqlExpr *pRight, int32_t optrType) {
return pExpr; return pExpr;
} }
static FORCE_INLINE int32_t tStrTokenCompare(SStrToken* left, SStrToken* right) {
return (left->type == right->type && left->n == right->n && strncasecmp(left->z, right->z, left->n) == 0) ? 0 : 1;
}
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right) {
if ((left == NULL && right) || (left && right == NULL)) {
return 1;
}
if (left->type != right->type) {
return 1;
}
if (left->tokenId != right->tokenId) {
return 1;
}
if (left->functionId != right->functionId) {
return 1;
}
if ((left->pLeft && right->pLeft == NULL)
|| (left->pLeft == NULL && right->pLeft)
|| (left->pRight && right->pRight == NULL)
|| (left->pRight == NULL && right->pRight)
|| (left->pParam && right->pParam == NULL)
|| (left->pParam == NULL && right->pParam)) {
return 1;
}
if (tVariantCompare(&left->value, &right->value)) {
return 1;
}
if (tStrTokenCompare(&left->colInfo, &right->colInfo)) {
return 1;
}
if (right->pParam && left->pParam) {
size_t size = taosArrayGetSize(right->pParam);
if (left->pParam && taosArrayGetSize(left->pParam) != size) {
return 1;
}
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pLeftElem = taosArrayGet(left->pParam, i);
tSqlExpr* pSubLeft = pLeftElem->pNode;
tSqlExprItem* pRightElem = taosArrayGet(left->pParam, i);
tSqlExpr* pSubRight = pRightElem->pNode;
if (tSqlExprCompare(pSubLeft, pSubRight)) {
return 1;
}
}
}
if (left->pLeft && tSqlExprCompare(left->pLeft, right->pLeft)) {
return 1;
}
if (left->pRight && tSqlExprCompare(left->pRight, right->pRight)) {
return 1;
}
return 0;
}
tSqlExpr *tSqlExprClone(tSqlExpr *pSrc) { tSqlExpr *tSqlExprClone(tSqlExpr *pSrc) {
tSqlExpr *pExpr = calloc(1, sizeof(tSqlExpr)); tSqlExpr *pExpr = calloc(1, sizeof(tSqlExpr));
@ -640,7 +711,7 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere, SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SFromInfo *pFrom, tSqlExpr *pWhere,
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SSessionWindowVal *pSession, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
SLimitVal *psLimit) { SLimitVal *psLimit, tSqlExpr *pHaving) {
assert(pSelectList != NULL); assert(pSelectList != NULL);
SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode)); SQuerySqlNode *pSqlNode = calloc(1, sizeof(SQuerySqlNode));
@ -655,6 +726,7 @@ SQuerySqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelectList, SF
pSqlNode->pSortOrder = pSortOrder; pSqlNode->pSortOrder = pSortOrder;
pSqlNode->pWhere = pWhere; pSqlNode->pWhere = pWhere;
pSqlNode->fillType = pFill; pSqlNode->fillType = pFill;
pSqlNode->pHaving = pHaving;
if (pLimit != NULL) { if (pLimit != NULL) {
pSqlNode->limit = *pLimit; pSqlNode->limit = *pLimit;
@ -718,6 +790,9 @@ void destroyQuerySqlNode(SQuerySqlNode *pQuerySql) {
tSqlExprDestroy(pQuerySql->pWhere); tSqlExprDestroy(pQuerySql->pWhere);
pQuerySql->pWhere = NULL; pQuerySql->pWhere = NULL;
tSqlExprDestroy(pQuerySql->pHaving);
pQuerySql->pHaving = NULL;
taosArrayDestroyEx(pQuerySql->pSortOrder, freeVariant); taosArrayDestroyEx(pQuerySql->pSortOrder, freeVariant);
pQuerySql->pSortOrder = NULL; pQuerySql->pSortOrder = NULL;

View File

@ -140,7 +140,7 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i];
int16_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes; int16_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes;
char * s = getPosInResultPage(pRuntimeEnv->pQuery, page, pResultRow->offset, offset); char * s = getPosInResultPage(pRuntimeEnv, page, pResultRow->offset, offset, size);
memset(s, 0, size); memset(s, 0, size);
offset += size; offset += size;

File diff suppressed because it is too large Load Diff

View File

@ -551,7 +551,10 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
if (pPeer->peerFd >= 0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
void *pConn = pPeer->pConn; void *pConn = pPeer->pConn;
if (pConn != NULL) syncFreeTcpConn(pPeer->pConn); if (pConn != NULL) {
syncFreeTcpConn(pPeer->pConn);
pPeer->pConn = NULL;
}
} }
} }

View File

@ -19,6 +19,10 @@ Run lua sample:
lua test.lua lua test.lua
``` ```
## Run performance test:
```
time lua benchmark.lua
```
## OpenResty Dependencies ## OpenResty Dependencies
- OpenResty: - OpenResty:
``` ```

View File

@ -0,0 +1,67 @@
local driver = require "luaconnector"
local config = {
password = "taosdata",
host = "127.0.0.1",
port = 6030,
database = "",
user = "root",
max_packet_size = 1024 * 1024
}
local conn
local res = driver.connect(config)
if res.code ~=0 then
print("connect--- failed: "..res.error)
return
else
conn = res.conn
print("connect--- pass.")
end
local res = driver.query(conn,"drop database if exists demo")
res = driver.query(conn,"create database demo")
if res.code ~=0 then
print("create db--- failed: "..res.error)
return
else
print("create db--- pass.")
end
res = driver.query(conn,"use demo")
if res.code ~=0 then
print("select db--- failed: "..res.error)
return
else
print("select db--- pass.")
end
res = driver.query(conn,"create table m1 (ts timestamp, speed int,owner binary(20))")
if res.code ~=0 then
print("create table---failed: "..res.error)
return
else
print("create table--- pass.")
end
local base = 1617330000000
local index =0
local count = 100000
local t
while( index < count )
do
t = base + index
local q=string.format([[insert into m1 values (%d,0,'robotspace')]],t)
res = driver.query(conn,q)
if res.code ~=0 then
print("insert records failed: "..res.error)
return
else
end
index = index+1
end
print(string.format([["Done. %d records has been stored."]],count))
driver.close(conn)

View File

@ -1,2 +1,2 @@
gcc lua_connector.c -fPIC -shared -o luaconnector.so -Wall -ltaos gcc -std=c99 lua_connector.c -fPIC -shared -o luaconnector.so -Wall -ltaos

View File

@ -1,2 +1,2 @@
gcc lua_connector51.c -fPIC -shared -o luaconnector51.so -Wall -ltaos gcc -std=c99 lua_connector51.c -fPIC -shared -o luaconnector51.so -Wall -ltaos

View File

@ -23,7 +23,7 @@ static int l_connect(lua_State *L){
luaL_checktype(L, 1, LUA_TTABLE); luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L,-1,"host"); lua_getfield(L,1,"host");
if (lua_isstring(L,-1)){ if (lua_isstring(L,-1)){
host = lua_tostring(L, -1); host = lua_tostring(L, -1);
// printf("host = %s\n", host); // printf("host = %s\n", host);

View File

@ -217,8 +217,8 @@ python3 ./test.py -f query/floatCompare.py
python3 ./test.py -f query/query1970YearsAf.py python3 ./test.py -f query/query1970YearsAf.py
python3 ./test.py -f query/bug3351.py python3 ./test.py -f query/bug3351.py
python3 ./test.py -f query/bug3375.py python3 ./test.py -f query/bug3375.py
python3 ./test.py -f query/queryJoin10tables.py
python3 ./test.py -f query/queryStddevWithGroupby.py
#stream #stream
python3 ./test.py -f stream/metric_1.py python3 ./test.py -f stream/metric_1.py

View File

@ -0,0 +1,201 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import taos
import sys
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def createtable(self):
# create stbles
tdSql.execute("create table if not exists stb1 (ts timestamp, c1 int) tags(t11 int, t12 int)")
tdSql.execute("create table if not exists stb2 (ts timestamp, c2 int) tags(t21 int, t22 int)")
tdSql.execute("create table if not exists stb3 (ts timestamp, c3 int) tags(t31 int, t32 int)")
tdSql.execute("create table if not exists stb4 (ts timestamp, c4 int) tags(t41 int, t42 int)")
tdSql.execute("create table if not exists stb5 (ts timestamp, c5 int) tags(t51 int, t52 int)")
tdSql.execute("create table if not exists stb6 (ts timestamp, c6 int) tags(t61 int, t62 int)")
tdSql.execute("create table if not exists stb7 (ts timestamp, c7 int) tags(t71 int, t72 int)")
tdSql.execute("create table if not exists stb8 (ts timestamp, c8 int) tags(t81 int, t82 int)")
tdSql.execute("create table if not exists stb9 (ts timestamp, c9 int) tags(t91 int, t92 int)")
tdSql.execute("create table if not exists stb10 (ts timestamp, c10 int) tags(t101 int, t102 int)")
tdSql.execute("create table if not exists stb11 (ts timestamp, c11 int) tags(t111 int, t112 int)")
# create normal tables
tdSql.execute("create table t10 using stb1 tags(0, 9)")
tdSql.execute("create table t11 using stb1 tags(1, 8)")
tdSql.execute("create table t12 using stb1 tags(2, 7)")
tdSql.execute("create table t13 using stb1 tags(3, 6)")
tdSql.execute("create table t14 using stb1 tags(4, 5)")
tdSql.execute("create table t15 using stb1 tags(5, 4)")
tdSql.execute("create table t16 using stb1 tags(6, 3)")
tdSql.execute("create table t17 using stb1 tags(7, 2)")
tdSql.execute("create table t18 using stb1 tags(8, 1)")
tdSql.execute("create table t19 using stb1 tags(9, 0)")
tdSql.execute("create table t110 using stb1 tags(10, 10)")
tdSql.execute("create table t20 using stb2 tags(0, 9)")
tdSql.execute("create table t21 using stb2 tags(1, 8)")
tdSql.execute("create table t22 using stb2 tags(2, 7)")
tdSql.execute("create table t30 using stb3 tags(0, 9)")
tdSql.execute("create table t31 using stb3 tags(1, 8)")
tdSql.execute("create table t32 using stb3 tags(2, 7)")
def inserttable(self):
for i in range(100):
if i<60:
tdSql.execute(f"insert into t20 values('2020-10-01 00:00:{i}.000', {i})")
tdSql.execute(f"insert into t21 values('2020-10-01 00:00:{i}.000', {i})")
tdSql.execute(f"insert into t22 values('2020-10-01 00:00:{i}.000', {i})")
tdSql.execute(f"insert into t30 values('2020-10-01 00:00:{i}.000', {i})")
tdSql.execute(f"insert into t31 values('2020-10-01 00:00:{i}.000', {i})")
tdSql.execute(f"insert into t32 values('2020-10-01 00:00:{i}.000', {i})")
else:
tdSql.execute(f"insert into t20 values('2020-10-01 00:01:{i-60}.000', {i})")
tdSql.execute(f"insert into t21 values('2020-10-01 00:01:{i-60}.000', {i})")
tdSql.execute(f"insert into t22 values('2020-10-01 00:01:{i-60}.000', {i})")
tdSql.execute(f"insert into t30 values('2020-10-01 00:01:{i-60}.000', {i})")
tdSql.execute(f"insert into t31 values('2020-10-01 00:01:{i-60}.000', {i})")
tdSql.execute(f"insert into t32 values('2020-10-01 00:01:{i-60}.000', {i})")
for j in range(11):
if i<60:
tdSql.execute(f"insert into t1{j} values('2020-10-01 00:00:{i}.000', {i})")
else:
tdSql.execute(f"insert into t1{j} values('2020-10-01 00:01:{i-60}.000', {i})")
def queryjointable(self):
tdSql.error(
'''select from t10,t11,t12,t13,t14,t15,t16,t17,t18,t19
where t10.ts=t11.ts and t10.ts=t12.ts and t10.ts=t13.ts and t10.ts=t14.ts and t10.ts=t15.ts
and t10.ts=t16.ts and t10.ts=t17.ts and t10.ts=t18.ts and t10.ts=t19.ts'''
)
tdSql.error("select * from t10 where t10.ts=t11.ts")
tdSql.error("select * from where t10.ts=t11.ts")
tdSql.error("select * from t10,t11,t12,t13,t14,t15,t16,t17,t18,t19")
tdSql.error("select * from stb1, stb2, stb3 where stb1.ts=stb2.ts and stb1.ts=stb3.ts")
tdSql.error("select * from stb1, stb2, stb3 where stb1.t11=stb2.t21 and stb1.t11=stb3.t31")
tdSql.error("select * from stb1, stb2, stb3")
tdSql.error(
'''select * from stb1
join stb2 on stb1.ts=stb2.ts and stb1.t11=stb2.t21
join stb3 on stb1.ts=stb3.ts and stb1.t11=stb3.t31'''
)
tdSql.error("select * from t10 join t11 on t10.ts=t11.ts join t12 on t11.ts=t12.ts")
tdSql.query(
'''select * from stb1,stb2,stb3
where stb1.ts=stb2.ts and stb1.ts=stb3.ts and stb1.t11=stb2.t21 and stb1.t11 =stb3.t31'''
)
tdSql.checkRows(300)
tdSql.query("select * from t11,t12,t13 where t11.ts=t12.ts and t11.ts=t13.ts")
tdSql.checkRows(100)
tdSql.error("selec * from t11,t12,t13 where t11.ts=t12.ts and t11.ts=t13.ts")
tdSql.error("select * form t11,t12,t13 where t11.ts=t12.ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts <> t12.ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts != t12.ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts or t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.c1=t12.c2 and t11.c1=t13.c3")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.c3 and t11.c1=t13.ts")
tdSql.error("select ts from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.ts and ts>100")
tdSql.error("select * from t11,t12,stb1 when t11.ts=t12.ts and t11.ts=stb1.ts")
tdSql.error("select t14.ts from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.ts1")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t14.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts")
tdSql.error("select * from t11,t12,t13 when t11.ts=t12.ts and t11.ts=t13.ts and t11.c1=t13.c3")
tdSql.error(
'''select * from t10,t11,t12,t13,t14,t15,t16,t17,t18,t19,t20
where t10.ts=t11.ts and t10.ts=t12.ts and t10.ts=t13.ts and t10.ts=t14.ts and t10.ts=t15.ts
and t10.ts=t16.ts and t10.ts=t17.ts and t10.ts=t18.ts and t10.ts=t19.ts and t10.ts=t20.ts'''
)
tdSql.error(
'''select * from t10,t11,t12,t13,t14,t15,t16,t17,t18,t19,t20
where t10.ts=t11.ts and t10.ts=t12.ts and t10.ts=t13.ts and t10.ts=t14.ts and t10.ts=t15.ts
and t10.ts=t16.ts and t10.ts=t17.ts and t10.ts=t18.ts and t10.ts=t19.ts'''
)
tdSql.error(
'''select * from t10,t11,t12,t13,t14,t15,t16,t17,t18,t19
where t10.ts=t11.ts and t10.ts=t12.ts and t10.ts=t13.ts and t10.ts=t14.ts and t10.ts=t15.ts
and t10.ts=t16.ts and t10.ts=t17.ts and t10.ts=t18.ts and t10.ts=t19.ts and t10.c1=t19.c1'''
)
tdSql.error(
'''select * from stb1,stb2,stb3
where stb1.ts=stb2.ts and stb1.ts=stb3.ts and stb1.t11=stb2.t21'''
)
tdSql.error(
'''select * from stb1,stb2,stb3
where stb1.ts=stb2.ts and stb1.t11=stb2.t21 and stb1.t11=stb3.t31'''
)
tdSql.error(
'''select * from stb1,stb2,stb3
where stb1.ts=stb2.ts and stb1.ts=stb3.ts and stb1.t11=stb2.t21 and stb1.t11=stb3.t31
and stb1.t12=stb3=t32'''
)
tdSql.error(
'''select * from stb1,stb2,stb3,stb4,stb5,stb6,stb7,stb8,stb9,stb10,stb11
where stb1.ts=stb2.ts and stb1.ts=stb3.ts and stb1.ts=stb4.ts and stb1.ts=stb5.ts and stb1.ts=stb6.ts
and stb1.ts=stb7.ts and stb1.ts=stb8.ts and stb1.ts=stb9.ts and stb1.ts=stb10.ts and stb1.ts=stb11.ts
and stb1.t11=stb2.t21 and stb1.t11=stb3.t31 and stb1.t11=stb4.t41 and stb1.t11=stb5.t51
and stb1.t11=stb6.t61 and stb1.t11=stb7.t71 and stb1.t11=stb8.t81 and stb1.t11=stb9.t91
and stb1.t11=stb10.t101 and stb1.t11=stb11.t111'''
)
tdSql.error(
'''select * from stb1,stb2,stb3,stb4,stb5,stb6,stb7,stb8,stb9,stb10
where stb1.ts=stb2.ts and stb1.ts=stb3.ts and stb1.ts=stb4.ts and stb1.ts=stb5.ts and stb1.ts=stb6.ts
and stb1.ts=stb7.ts and stb1.ts=stb8.ts and stb1.ts=stb9.ts and stb1.ts=stb10.ts and stb1.t11=stb2.t21
and stb1.t11=stb3.t31 and stb1.t11=stb4.t41 and stb1.t11=stb5.t51 and stb1.t11=stb6.t61
and stb1.t11=stb7.t71 and stb1.t11=stb8.t81 and stb1.t11=stb9.t91 and stb1.t11=stb10.t101
and stb1.t12=stb11.t102'''
)
def run(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table")
self.createtable()
tdLog.printNoPrefix("==========step2:insert data")
self.inserttable()
tdLog.printNoPrefix("==========step3:query timestamp type")
self.queryjointable()
# after wal and sync, check again
tdSql.query("show dnodes")
index = tdSql.getData(0, 0)
tdDnodes.stop(index)
tdDnodes.start(index)
tdLog.printNoPrefix("==========step4:query again after wal")
self.queryjointable()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,68 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def querysqls(self):
tdSql.query("select stddev(c1) from t10 group by c1")
tdSql.checkRows(6)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 0)
tdSql.checkData(2, 0, 0)
tdSql.checkData(3, 0, 0)
tdSql.checkData(4, 0, 0)
tdSql.checkData(5, 0, 0)
tdSql.query("select stddev(c2) from t10")
tdSql.checkData(0, 0, 0.5)
def run(self):
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db keep 36500")
tdSql.execute("use db")
tdLog.printNoPrefix("==========step1:create table && insert data")
tdSql.execute("create stable stb1 (ts timestamp , c1 int ,c2 float) tags(t1 int)")
tdSql.execute("create table t10 using stb1 tags(1)")
tdSql.execute("insert into t10 values ('1969-12-31 00:00:00.000', 2,1)")
tdSql.execute("insert into t10 values ('1970-01-01 00:00:00.000', 3,1)")
tdSql.execute("insert into t10 values (0, 4,1)")
tdSql.execute("insert into t10 values (now-18725d, 1,2)")
tdSql.execute("insert into t10 values ('2021-04-06 00:00:00.000', 5,2)")
tdSql.execute("insert into t10 values (now+1d,6,2)")
tdLog.printNoPrefix("==========step2:query and check")
self.querysqls()
tdLog.printNoPrefix("==========step3:after wal,check again")
tdSql.query("show dnodes")
index = tdSql.getData(0, 0)
tdDnodes.stop(index)
tdDnodes.start(index)
self.querysqls()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -220,6 +220,7 @@ sql_error select sum(c3), ts, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select sum(c3), first(ts), c2 from group_tb0 where c1 < 20 group by c1; sql_error select sum(c3), first(ts), c2 from group_tb0 where c1 < 20 group by c1;
sql_error select first(c3), ts, c1, c2 from group_tb0 where c1 < 20 group by c1; sql_error select first(c3), ts, c1, c2 from group_tb0 where c1 < 20 group by c1;
sql_error select first(c3), last(c3), ts, c1 from group_tb0 where c1 < 20 group by c1; sql_error select first(c3), last(c3), ts, c1 from group_tb0 where c1 < 20 group by c1;
sql_error select ts from group_tb0 group by c1;
#===========================interval=====not support====================== #===========================interval=====not support======================
sql_error select count(*), c1 from group_tb0 where c1<20 interval(1y) group by c1; sql_error select count(*), c1 from group_tb0 where c1<20 interval(1y) group by c1;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff